diff --git a/README.md b/README.md index 4f56cc375..abbba65f2 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,8 @@ OpenDHT +![PyPI - Version](https://img.shields.io/pypi/v/opendht?style=flat) + A lightweight C++17 Distributed Hash Table implementation. OpenDHT provides an easy to use distributed in-memory data store. diff --git a/src/securedht.cpp b/src/securedht.cpp index 796b2457b..f73954720 100644 --- a/src/securedht.cpp +++ b/src/securedht.cpp @@ -53,25 +53,30 @@ SecureDht::SecureDht(std::unique_ptr dht, SecureDht::Config conf, auto certLongId = certificate_->getLongId(); if (key_ and (certId != key_->getPublicKey().getId() or certLongId != key_->getPublicKey().getLongId())) throw DhtException("SecureDht: provided certificate doesn't match private key."); - dht_->addOnConnectedCallback([this, certId, certLongId, cb=std::move(iacb)]{ + + auto cb = iacb ? [ + this, + completedState = std::make_shared>(2, true), + iacb = std::move(iacb) + ](bool ok) { + if (logger_) + logger_->d("SecureDht: certificate announcement %s", ok ? "succeeded" : "failed"); + completedState->second = completedState->second and ok; + if (--completedState->first == 0) + iacb(completedState->second); + } : iacb; + + dht_->addOnConnectedCallback([this, certId, certLongId, cb=std::move(cb)]{ dht_->put(certId, Value { CERTIFICATE_TYPE, *certificate_, 1 - }, [this, certId, cb=std::move(cb)](bool ok) { - if (cb) cb(ok); - if (logger_) - logger_->d(certId, "SecureDht: certificate announcement %s", ok ? "succeeded" : "failed"); - }, {}, true); + }, cb, {}, true); dht_->put(InfoHash::get(certLongId), Value { CERTIFICATE_TYPE, *certificate_, 1 - }, [this, cb=std::move(cb)](bool ok) { - if (cb) cb(ok); - if (logger_) - logger_->debug("SecureDht: certificate announcement {}", ok ? "succeeded" : "failed"); - }, {}, true); + }, cb, {}, true); }); } } diff --git a/tests/dhtrunnertester.cpp b/tests/dhtrunnertester.cpp index 923938e48..ad5460431 100644 --- a/tests/dhtrunnertester.cpp +++ b/tests/dhtrunnertester.cpp @@ -52,13 +52,13 @@ DhtRunnerTester::tearDown() { std::condition_variable cv; std::mutex cv_m; auto shutdown = [&]{ - std::lock_guard lk(cv_m); + std::lock_guard lk(cv_m); done++; cv.notify_all(); }; node1.shutdown(shutdown); node2.shutdown(shutdown); - std::unique_lock lk(cv_m); + std::unique_lock lk(cv_m); CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]{ return done == 2u; })); node1.join(); node2.join(); @@ -207,13 +207,13 @@ DhtRunnerTester::testListen() { for (unsigned i=0; i lock(mutex); + std::lock_guard lock(mutex); putCount++; if (ok) putOkCount1++; cv.notify_all(); }); node2.put(b, dht::Value("v2"), [&](bool ok) { - std::lock_guard lock(mutex); + std::lock_guard lock(mutex); putCount++; if (ok) putOkCount2++; cv.notify_all(); @@ -221,7 +221,7 @@ DhtRunnerTester::testListen() { auto bigVal = std::make_shared(); bigVal->data = mtu; node2.put(c, std::move(bigVal), [&](bool ok) { - std::lock_guard lock(mutex); + std::lock_guard lock(mutex); putCount++; if (ok) putOkCount3++; cv.notify_all(); @@ -229,7 +229,7 @@ DhtRunnerTester::testListen() { } { - std::unique_lock lk(mutex); + std::unique_lock lk(mutex); CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]{ return putCount == N * 3u; })); CPPUNIT_ASSERT_EQUAL(N, putOkCount1); CPPUNIT_ASSERT_EQUAL(N, putOkCount2); @@ -264,6 +264,7 @@ void DhtRunnerTester::testIdOps() { std::mutex mutex; std::condition_variable cv; + unsigned identityCount(0); unsigned valueCount(0); unsigned valueCountEdit(0); @@ -275,8 +276,8 @@ DhtRunnerTester::testIdOps() { dht::DhtRunner::Context context2; context2.identityAnnouncedCb = [&](bool ok) { CPPUNIT_ASSERT(ok); - std::lock_guard lk(mutex); - valueCount++; + std::lock_guard lk(mutex); + identityCount++; cv.notify_all(); }; @@ -288,27 +289,28 @@ DhtRunnerTester::testIdOps() { node2.bootstrap(bound); { - std::unique_lock lk(mutex); - CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCount == 1; })); + std::unique_lock lk(mutex); + CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return identityCount == 1; })); } node1.findCertificate(node2.getId(), [&](const std::shared_ptr& crt){ CPPUNIT_ASSERT(crt); - std::lock_guard lk(mutex); + std::lock_guard lk(mutex); valueCount++; cv.notify_all(); }); { - std::unique_lock lk(mutex); - CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCount == 2; })); + std::unique_lock lk(mutex); + CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCount == 1u; })); + CPPUNIT_ASSERT_EQUAL(1u, identityCount); } dht::DhtRunner::Context context1; context1.identityAnnouncedCb = [&](bool ok) { CPPUNIT_ASSERT(ok); - std::lock_guard lk(mutex); - valueCount++; + std::lock_guard lk(mutex); + identityCount++; cv.notify_all(); }; @@ -323,21 +325,21 @@ DhtRunnerTester::testIdOps() { auto key = dht::InfoHash::get("key"); node1.putEncrypted(key, node2.getId(), dht::Value("yo"), [&](bool ok){ CPPUNIT_ASSERT(ok); - std::lock_guard lk(mutex); + std::lock_guard lk(mutex); valueCount++; cv.notify_all(); }); node1.putEncrypted(key, node2.getPublicKey(), std::make_shared("yo"), [&](bool ok){ CPPUNIT_ASSERT(ok); - std::lock_guard lk(mutex); + std::lock_guard lk(mutex); valueCount++; cv.notify_all(); }); node2.listen(key, [&](std::string&& value){ CPPUNIT_ASSERT_EQUAL("yo"s, value); - std::lock_guard lk(mutex); + std::lock_guard lk(mutex); valueCount++; cv.notify_all(); return true; @@ -347,7 +349,7 @@ DhtRunnerTester::testIdOps() { auto editValue = std::make_shared("v1"); node1.putSigned(key2, editValue, [&](bool ok){ CPPUNIT_ASSERT(ok); - std::lock_guard lk(mutex); + std::lock_guard lk(mutex); valueCountEdit++; cv.notify_all(); }); @@ -359,16 +361,29 @@ DhtRunnerTester::testIdOps() { CPPUNIT_ASSERT_EQUAL("v2"s, dht::unpackMsg(v->data)); CPPUNIT_ASSERT_EQUAL(v->owner->getLongId(), node1.getPublicKey()->getLongId()); } - std::lock_guard lk(mutex); + std::lock_guard lk(mutex); valueCountEdit += values.size(); cv.notify_all(); return true; }); { - std::unique_lock lk(mutex); - CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCount == 7; })); - CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCountEdit == 2; })); + std::unique_lock lk(mutex); + CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return identityCount == 2u; })); + CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCount == 5u; })); + CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCountEdit == 2u; })); + } + + node2.findCertificate(node1.getPublicKey()->getLongId(), [&](const std::shared_ptr& crt){ + CPPUNIT_ASSERT(crt); + std::lock_guard lk(mutex); + valueCount++; + cv.notify_all(); + }); + + { + std::unique_lock lk(mutex); + CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCount == 6u; })); } // editValue->data = dht::packMsg("v2"); @@ -376,12 +391,12 @@ DhtRunnerTester::testIdOps() { editValue->data = dht::packMsg("v2"); node1.putSigned(key2, editValue, [&](bool ok){ CPPUNIT_ASSERT(ok); - std::lock_guard lk(mutex); + std::lock_guard lk(mutex); valueCountEdit++; cv.notify_all(); }); - std::unique_lock lk(mutex); - CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCountEdit == 4; })); + std::unique_lock lk(mutex); + CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCountEdit == 4u; })); } void @@ -399,14 +414,14 @@ DhtRunnerTester::testListenLotOfBytes() { for (unsigned i=0; i lock(mutex); + std::lock_guard lock(mutex); putCount++; if (ok) putOkCount++; cv.notify_all(); }); } { - std::unique_lock lk(mutex); + std::unique_lock lk(mutex); CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]{ return putCount == N; })); } @@ -427,7 +442,7 @@ DhtRunnerTester::testListenLotOfBytes() { }); { - std::unique_lock lk(mutex); + std::unique_lock lk(mutex); CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]{ return valueCount == N; })); } @@ -447,7 +462,7 @@ DhtRunnerTester::testMultithread() { for (unsigned i=0; i lock(mutex); + std::lock_guard lock(mutex); putCount++; if (ok) putOkCount++; cv.notify_all(); @@ -455,14 +470,14 @@ DhtRunnerTester::testMultithread() { node2.get(dht::InfoHash::get("123" + std::to_string(N-i-1)), [](const std::shared_ptr&){ return true; }, [&](bool ok) { - std::lock_guard lock(mutex); + std::lock_guard lock(mutex); putCount++; if (ok) putOkCount++; cv.notify_all(); }); }); } - std::unique_lock lk(mutex); + std::unique_lock lk(mutex); CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]{ return putCount == 2*N; })); CPPUNIT_ASSERT_EQUAL(2*N, putOkCount);