Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
<a id="user-content-opendht-" class="anchor" href="/savoirfairelinux/opendht/blob/master/README.md#opendht-" aria-hidden="true"></a>OpenDHT
</h1>

![PyPI - Version](https://img.shields.io/pypi/v/opendht?style=flat)

A lightweight C++17 Distributed Hash Table implementation.

OpenDHT provides an easy to use distributed in-memory data store.
Expand Down
27 changes: 16 additions & 11 deletions src/securedht.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,25 +53,30 @@ SecureDht::SecureDht(std::unique_ptr<DhtInterface> dht, SecureDht::Config conf,
auto certLongId = certificate_->getLongId();
if (key_ and (certId != key_->getPublicKey().getId() or certLongId != key_->getPublicKey().getLongId()))
throw DhtException("SecureDht: provided certificate doesn't match private key.");
dht_->addOnConnectedCallback([this, certId, certLongId, cb=std::move(iacb)]{

auto cb = iacb ? [
this,
completedState = std::make_shared<std::pair<unsigned,bool>>(2, true),
iacb = std::move(iacb)
](bool ok) {
if (logger_)
logger_->d("SecureDht: certificate announcement %s", ok ? "succeeded" : "failed");
completedState->second = completedState->second and ok;
if (--completedState->first == 0)
iacb(completedState->second);
} : iacb;

dht_->addOnConnectedCallback([this, certId, certLongId, cb=std::move(cb)]{
dht_->put(certId, Value {
CERTIFICATE_TYPE,
*certificate_,
1
}, [this, certId, cb=std::move(cb)](bool ok) {
if (cb) cb(ok);
if (logger_)
logger_->d(certId, "SecureDht: certificate announcement %s", ok ? "succeeded" : "failed");
}, {}, true);
}, cb, {}, true);
dht_->put(InfoHash::get(certLongId), Value {
CERTIFICATE_TYPE,
*certificate_,
1
}, [this, cb=std::move(cb)](bool ok) {
if (cb) cb(ok);
if (logger_)
logger_->debug("SecureDht: certificate announcement {}", ok ? "succeeded" : "failed");
}, {}, true);
}, cb, {}, true);
});
}
}
Expand Down
79 changes: 47 additions & 32 deletions tests/dhtrunnertester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ DhtRunnerTester::tearDown() {
std::condition_variable cv;
std::mutex cv_m;
auto shutdown = [&]{
std::lock_guard<std::mutex> lk(cv_m);
std::lock_guard lk(cv_m);
done++;
cv.notify_all();
};
node1.shutdown(shutdown);
node2.shutdown(shutdown);
std::unique_lock<std::mutex> lk(cv_m);
std::unique_lock lk(cv_m);
CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]{ return done == 2u; }));
node1.join();
node2.join();
Expand Down Expand Up @@ -207,29 +207,29 @@ DhtRunnerTester::testListen() {

for (unsigned i=0; i<N; i++) {
node2.put(a, dht::Value("v1"), [&](bool ok) {
std::lock_guard<std::mutex> lock(mutex);
std::lock_guard lock(mutex);
putCount++;
if (ok) putOkCount1++;
cv.notify_all();
});
node2.put(b, dht::Value("v2"), [&](bool ok) {
std::lock_guard<std::mutex> lock(mutex);
std::lock_guard lock(mutex);
putCount++;
if (ok) putOkCount2++;
cv.notify_all();
});
auto bigVal = std::make_shared<dht::Value>();
bigVal->data = mtu;
node2.put(c, std::move(bigVal), [&](bool ok) {
std::lock_guard<std::mutex> lock(mutex);
std::lock_guard lock(mutex);
putCount++;
if (ok) putOkCount3++;
cv.notify_all();
});
}

{
std::unique_lock<std::mutex> lk(mutex);
std::unique_lock lk(mutex);
CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]{ return putCount == N * 3u; }));
CPPUNIT_ASSERT_EQUAL(N, putOkCount1);
CPPUNIT_ASSERT_EQUAL(N, putOkCount2);
Expand Down Expand Up @@ -264,6 +264,7 @@ void
DhtRunnerTester::testIdOps() {
std::mutex mutex;
std::condition_variable cv;
unsigned identityCount(0);
unsigned valueCount(0);
unsigned valueCountEdit(0);

Expand All @@ -275,8 +276,8 @@ DhtRunnerTester::testIdOps() {
dht::DhtRunner::Context context2;
context2.identityAnnouncedCb = [&](bool ok) {
CPPUNIT_ASSERT(ok);
std::lock_guard<std::mutex> lk(mutex);
valueCount++;
std::lock_guard lk(mutex);
identityCount++;
cv.notify_all();
};

Expand All @@ -288,27 +289,28 @@ DhtRunnerTester::testIdOps() {
node2.bootstrap(bound);

{
std::unique_lock<std::mutex> lk(mutex);
CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCount == 1; }));
std::unique_lock lk(mutex);
CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return identityCount == 1; }));
}

node1.findCertificate(node2.getId(), [&](const std::shared_ptr<dht::crypto::Certificate>& crt){
CPPUNIT_ASSERT(crt);
std::lock_guard<std::mutex> lk(mutex);
std::lock_guard lk(mutex);
valueCount++;
cv.notify_all();
});

{
std::unique_lock<std::mutex> lk(mutex);
CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCount == 2; }));
std::unique_lock lk(mutex);
CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCount == 1u; }));
CPPUNIT_ASSERT_EQUAL(1u, identityCount);
}

dht::DhtRunner::Context context1;
context1.identityAnnouncedCb = [&](bool ok) {
CPPUNIT_ASSERT(ok);
std::lock_guard<std::mutex> lk(mutex);
valueCount++;
std::lock_guard lk(mutex);
identityCount++;
cv.notify_all();
};

Expand All @@ -323,21 +325,21 @@ DhtRunnerTester::testIdOps() {
auto key = dht::InfoHash::get("key");
node1.putEncrypted(key, node2.getId(), dht::Value("yo"), [&](bool ok){
CPPUNIT_ASSERT(ok);
std::lock_guard<std::mutex> lk(mutex);
std::lock_guard lk(mutex);
valueCount++;
cv.notify_all();
});

node1.putEncrypted(key, node2.getPublicKey(), std::make_shared<dht::Value>("yo"), [&](bool ok){
CPPUNIT_ASSERT(ok);
std::lock_guard<std::mutex> lk(mutex);
std::lock_guard lk(mutex);
valueCount++;
cv.notify_all();
});

node2.listen<std::string>(key, [&](std::string&& value){
CPPUNIT_ASSERT_EQUAL("yo"s, value);
std::lock_guard<std::mutex> lk(mutex);
std::lock_guard lk(mutex);
valueCount++;
cv.notify_all();
return true;
Expand All @@ -347,7 +349,7 @@ DhtRunnerTester::testIdOps() {
auto editValue = std::make_shared<dht::Value>("v1");
node1.putSigned(key2, editValue, [&](bool ok){
CPPUNIT_ASSERT(ok);
std::lock_guard<std::mutex> lk(mutex);
std::lock_guard lk(mutex);
valueCountEdit++;
cv.notify_all();
});
Expand All @@ -359,29 +361,42 @@ DhtRunnerTester::testIdOps() {
CPPUNIT_ASSERT_EQUAL("v2"s, dht::unpackMsg<std::string>(v->data));
CPPUNIT_ASSERT_EQUAL(v->owner->getLongId(), node1.getPublicKey()->getLongId());
}
std::lock_guard<std::mutex> lk(mutex);
std::lock_guard lk(mutex);
valueCountEdit += values.size();
cv.notify_all();
return true;
});

{
std::unique_lock<std::mutex> lk(mutex);
CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCount == 7; }));
CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCountEdit == 2; }));
std::unique_lock lk(mutex);
CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return identityCount == 2u; }));
CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCount == 5u; }));
CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCountEdit == 2u; }));
}

node2.findCertificate(node1.getPublicKey()->getLongId(), [&](const std::shared_ptr<dht::crypto::Certificate>& crt){
CPPUNIT_ASSERT(crt);
std::lock_guard lk(mutex);
valueCount++;
cv.notify_all();
});

{
std::unique_lock lk(mutex);
CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCount == 6u; }));
}

// editValue->data = dht::packMsg("v2");
editValue = std::make_shared<dht::Value>(editValue->id);
editValue->data = dht::packMsg("v2");
node1.putSigned(key2, editValue, [&](bool ok){
CPPUNIT_ASSERT(ok);
std::lock_guard<std::mutex> lk(mutex);
std::lock_guard lk(mutex);
valueCountEdit++;
cv.notify_all();
});
std::unique_lock<std::mutex> lk(mutex);
CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCountEdit == 4; }));
std::unique_lock lk(mutex);
CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCountEdit == 4u; }));
}

void
Expand All @@ -399,14 +414,14 @@ DhtRunnerTester::testListenLotOfBytes() {

for (unsigned i=0; i<N; i++) {
node2.put(foo, data, [&](bool ok) {
std::lock_guard<std::mutex> lock(mutex);
std::lock_guard lock(mutex);
putCount++;
if (ok) putOkCount++;
cv.notify_all();
});
}
{
std::unique_lock<std::mutex> lk(mutex);
std::unique_lock lk(mutex);
CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]{ return putCount == N; }));
}

Expand All @@ -427,7 +442,7 @@ DhtRunnerTester::testListenLotOfBytes() {
});

{
std::unique_lock<std::mutex> lk(mutex);
std::unique_lock lk(mutex);
CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]{ return valueCount == N; }));
}

Expand All @@ -447,22 +462,22 @@ DhtRunnerTester::testMultithread() {
for (unsigned i=0; i<N; i++) {
dht::ThreadPool::computation().run([&]{
node2.put(dht::InfoHash::get("123" + std::to_string(i)), "hehe", [&](bool ok) {
std::lock_guard<std::mutex> lock(mutex);
std::lock_guard lock(mutex);
putCount++;
if (ok) putOkCount++;
cv.notify_all();
});
node2.get(dht::InfoHash::get("123" + std::to_string(N-i-1)), [](const std::shared_ptr<dht::Value>&){
return true;
}, [&](bool ok) {
std::lock_guard<std::mutex> lock(mutex);
std::lock_guard lock(mutex);
putCount++;
if (ok) putOkCount++;
cv.notify_all();
});
});
}
std::unique_lock<std::mutex> lk(mutex);
std::unique_lock lk(mutex);
CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]{ return putCount == 2*N; }));
CPPUNIT_ASSERT_EQUAL(2*N, putOkCount);

Expand Down
Loading