Skip to content

Commit 5a7e8dc

Browse files
committed
search: combine callbacks in case of multiple equivalent puts
1 parent c4857bf commit 5a7e8dc

File tree

4 files changed

+60
-20
lines changed

4 files changed

+60
-20
lines changed

src/dht.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ Dht::shutdown(ShutdownCallback cb, bool stop)
9393
r.second.done_cb(false, {});
9494
sr.second->callbacks.clear();
9595
for (const auto& a : sr.second->announce) {
96-
if (a.callback) a.callback(false, {});
96+
for (auto& cb : a.callbacks) cb(false, {});
9797
}
9898
sr.second->announce.clear();
9999
sr.second->listeners.clear();

src/search.h

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ struct Dht::Announce {
4545
bool permanent;
4646
Sp<Value> value;
4747
time_point created;
48-
DoneCallback callback;
48+
std::vector<DoneCallback> callbacks;
4949
};
5050

5151
struct Dht::SearchNode {
@@ -442,8 +442,9 @@ struct Dht::Search {
442442
g.second.done_cb = {};
443443
}
444444
for (auto& a : announce) {
445-
a.callback(false, {});
446-
a.callback = {};
445+
for (auto& cb : a.callbacks)
446+
cb(false, {});
447+
a.callbacks.clear();
447448
}
448449
}
449450

@@ -627,32 +628,36 @@ struct Dht::Search {
627628
return a.value->id == value->id;
628629
});
629630
if (a_sr == announce.end()) {
630-
announce.emplace_back(Announce {permanent, value, created, callback});
631+
auto& a = announce.emplace_back(Announce {permanent, value, created, {}} );
632+
if (callback)
633+
a.callbacks.emplace_back(std::move(callback));
631634
for (auto& n : nodes) {
632635
n->probe_query.reset();
633636
n->acked[value->id].req.reset();
634637
}
635638
} else {
636639
a_sr->permanent = permanent;
637640
a_sr->created = created;
638-
if (a_sr->value != value) {
641+
if (a_sr->value != value && *a_sr->value != *value) {
642+
// Value is updated, previous ops are failed
639643
a_sr->value = value;
640644
for (auto& n : nodes) {
641645
n->acked[value->id].req.reset();
642646
n->probe_query.reset();
643647
}
644-
}
645-
if (isAnnounced(value->id)) {
646-
if (a_sr->callback)
647-
a_sr->callback(true, {});
648-
a_sr->callback = {};
648+
for (auto& cb: a_sr->callbacks)
649+
cb(false, {});
650+
a_sr->callbacks.clear();
651+
if (callback)
652+
a_sr->callbacks.emplace_back(std::move(callback));
653+
} else if (isAnnounced(value->id)) {
654+
// Same value, already announced
649655
if (callback)
650656
callback(true, {});
651-
return;
652657
} else {
653-
if (a_sr->callback)
654-
a_sr->callback(false, {});
655-
a_sr->callback = callback;
658+
// Same value, not announced yet
659+
if (callback)
660+
a_sr->callbacks.emplace_back(std::move(callback));
656661
}
657662
}
658663
}
@@ -722,8 +727,8 @@ struct Dht::Search {
722727
std::vector<DoneCallback> a_cbs;
723728
a_cbs.reserve(announce.size());
724729
for (auto ait = announce.begin() ; ait != announce.end(); ) {
725-
if (ait->callback)
726-
a_cbs.emplace_back(std::move(ait->callback));
730+
a_cbs.insert(a_cbs.end(), std::make_move_iterator(ait->callbacks.begin()), std::make_move_iterator(ait->callbacks.end()));
731+
ait->callbacks.clear();
727732
if (not ait->permanent)
728733
ait = announce.erase(ait);
729734
else
@@ -747,9 +752,11 @@ struct Dht::Search {
747752
if (vid != Value::INVALID_ID and (!a.value || a.value->id != vid))
748753
return true;
749754
if (isAnnounced(a.value->id)) {
750-
if (a.callback) {
751-
a.callback(true, getNodes());
752-
a.callback = nullptr;
755+
if (!a.callbacks.empty()) {
756+
const auto& nodes = getNodes();
757+
for (auto& cb : a.callbacks)
758+
cb(true, nodes);
759+
a.callbacks.clear();
753760
}
754761
if (not a.permanent)
755762
return false;

tests/dhtrunnertester.cpp

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,34 @@ DhtRunnerTester::testPutDuplicate() {
115115
}
116116

117117

118+
void
119+
DhtRunnerTester::testPutOverride() {
120+
auto key = dht::InfoHash::get("123");
121+
auto val = std::make_shared<dht::Value>("meh");
122+
val->id = 42;
123+
auto val2 = std::make_shared<dht::Value>("hey");
124+
val2->id = 42;
125+
CPPUNIT_ASSERT_EQUAL(val->id, val2->id);
126+
auto val_data = val2->data;
127+
std::promise<bool> p1;
128+
std::promise<bool> p2;
129+
node2.put(key, val, [&](bool ok){
130+
p1.set_value(ok);
131+
});
132+
node2.put(key, val2, [&](bool ok){
133+
p2.set_value(ok);
134+
});
135+
auto p1ret = p1.get_future().get();
136+
auto p2ret = p2.get_future().get();
137+
CPPUNIT_ASSERT(!p1ret);
138+
CPPUNIT_ASSERT(p2ret);
139+
auto vals = node1.get(key).get();
140+
CPPUNIT_ASSERT(not vals.empty());
141+
CPPUNIT_ASSERT(vals.size() == 1);
142+
CPPUNIT_ASSERT(vals.front()->data == val_data);
143+
}
144+
145+
118146
void
119147
DhtRunnerTester::testListen() {
120148
std::mutex mutex;

tests/dhtrunnertester.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class DhtRunnerTester : public CppUnit::TestFixture {
3232
CPPUNIT_TEST(testConstructors);
3333
CPPUNIT_TEST(testGetPut);
3434
CPPUNIT_TEST(testPutDuplicate);
35+
CPPUNIT_TEST(testPutOverride);
3536
CPPUNIT_TEST(testListen);
3637
CPPUNIT_TEST(testListenLotOfBytes);
3738
CPPUNIT_TEST(testIdOps);
@@ -60,6 +61,10 @@ class DhtRunnerTester : public CppUnit::TestFixture {
6061
* Test get and multiple put
6162
*/
6263
void testPutDuplicate();
64+
/**
65+
* Test get and multiple put with changing value
66+
*/
67+
void testPutOverride();
6368
/**
6469
* Test listen method
6570
*/

0 commit comments

Comments
 (0)