Skip to content

Commit d827bf5

Browse files
committed
fix cuprated builds
1 parent 9ddf88d commit d827bf5

File tree

10 files changed

+127
-49
lines changed

10 files changed

+127
-49
lines changed

Cargo.lock

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
resolver = "2"
33

44
members = [
5-
#"binaries/cuprated",
5+
"binaries/cuprated",
66
"constants",
77
"consensus",
88
"consensus/context",

binaries/cuprated/src/blockchain/syncer.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use tracing::instrument;
1212
use cuprate_consensus::{BlockChainContext, BlockChainContextRequest, BlockChainContextResponse};
1313
use cuprate_p2p::{
1414
block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse},
15-
NetworkInterface,
15+
NetworkInterface, PeerSetRequest, PeerSetResponse,
1616
};
1717
use cuprate_p2p_core::ClearNet;
1818

@@ -28,15 +28,11 @@ pub enum SyncerError {
2828
}
2929

3030
/// The syncer tasks that makes sure we are fully synchronised with our connected peers.
31-
#[expect(
32-
clippy::significant_drop_tightening,
33-
reason = "Client pool which will be removed"
34-
)]
3531
#[instrument(level = "debug", skip_all)]
3632
pub async fn syncer<C, CN>(
3733
mut context_svc: C,
3834
our_chain: CN,
39-
clearnet_interface: NetworkInterface<ClearNet>,
35+
mut clearnet_interface: NetworkInterface<ClearNet>,
4036
incoming_block_batch_tx: mpsc::Sender<BlockBatch>,
4137
stop_current_block_downloader: Arc<Notify>,
4238
block_downloader_config: BlockDownloaderConfig,
@@ -67,8 +63,6 @@ where
6763
unreachable!();
6864
};
6965

70-
let client_pool = clearnet_interface.client_pool();
71-
7266
tracing::debug!("Waiting for new sync info in top sync channel");
7367

7468
loop {
@@ -79,9 +73,20 @@ where
7973
check_update_blockchain_context(&mut context_svc, &mut blockchain_ctx).await?;
8074
let raw_blockchain_context = blockchain_ctx.unchecked_blockchain_context();
8175

82-
if !client_pool.contains_client_with_more_cumulative_difficulty(
83-
raw_blockchain_context.cumulative_difficulty,
84-
) {
76+
let PeerSetResponse::MostPoWSeen {
77+
cumulative_difficulty,
78+
..
79+
} = clearnet_interface
80+
.peer_set()
81+
.ready()
82+
.await?
83+
.call(PeerSetRequest::MostPoWSeen)
84+
.await?
85+
else {
86+
unreachable!();
87+
};
88+
89+
if cumulative_difficulty <= raw_blockchain_context.cumulative_difficulty {
8590
continue;
8691
}
8792

binaries/cuprated/src/txpool/dandelion.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ pub fn dandelion_router(clear_net: NetworkInterface<ClearNet>) -> ConcreteDandel
5959
diffuse_service::DiffuseService {
6060
clear_net_broadcast_service: clear_net.broadcast_svc(),
6161
},
62-
stem_service::OutboundPeerStream { clear_net },
62+
stem_service::OutboundPeerStream::new(clear_net),
6363
DANDELION_CONFIG,
6464
)
6565
}

binaries/cuprated/src/txpool/dandelion/stem_service.rs

Lines changed: 53 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
1+
use bytes::Bytes;
2+
use futures::future::BoxFuture;
3+
use futures::{FutureExt, Stream};
4+
use std::future::Future;
5+
use std::task::ready;
16
use std::{
27
pin::Pin,
38
task::{Context, Poll},
49
};
5-
6-
use bytes::Bytes;
7-
use futures::Stream;
810
use tower::Service;
911

1012
use cuprate_dandelion_tower::{traits::StemRequest, OutboundPeer};
11-
use cuprate_p2p::{ClientPoolDropGuard, NetworkInterface};
13+
use cuprate_p2p::{ClientDropGuard, NetworkInterface, PeerSetRequest, PeerSetResponse};
1214
use cuprate_p2p_core::{
1315
client::{Client, InternalPeerID},
1416
ClearNet, NetworkZone, PeerRequest, ProtocolRequest,
@@ -19,7 +21,17 @@ use crate::{p2p::CrossNetworkInternalPeerId, txpool::dandelion::DandelionTx};
1921

2022
/// The dandelion outbound peer stream.
2123
pub struct OutboundPeerStream {
22-
pub clear_net: NetworkInterface<ClearNet>,
24+
clear_net: NetworkInterface<ClearNet>,
25+
state: OutboundPeerStreamState,
26+
}
27+
28+
impl OutboundPeerStream {
29+
pub const fn new(clear_net: NetworkInterface<ClearNet>) -> Self {
30+
Self {
31+
clear_net,
32+
state: OutboundPeerStreamState::Standby,
33+
}
34+
}
2335
}
2436

2537
impl Stream for OutboundPeerStream {
@@ -28,23 +40,46 @@ impl Stream for OutboundPeerStream {
2840
tower::BoxError,
2941
>;
3042

31-
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
32-
// TODO: make the outbound peer choice random.
33-
Poll::Ready(Some(Ok(self
34-
.clear_net
35-
.client_pool()
36-
.outbound_client()
37-
.map_or(OutboundPeer::Exhausted, |client| {
38-
OutboundPeer::Peer(
39-
CrossNetworkInternalPeerId::ClearNet(client.info.id),
40-
StemPeerService(client),
41-
)
42-
}))))
43+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
44+
loop {
45+
match &mut self.state {
46+
OutboundPeerStreamState::Standby => {
47+
let peer_set = self.clear_net.peer_set();
48+
let res = ready!(peer_set.poll_ready(cx));
49+
50+
self.state = OutboundPeerStreamState::AwaitingPeer(
51+
peer_set.call(PeerSetRequest::StemPeer).boxed(),
52+
);
53+
}
54+
OutboundPeerStreamState::AwaitingPeer(fut) => {
55+
let res = ready!(fut.poll_unpin(cx));
56+
57+
return Poll::Ready(Some(res.map(|res| {
58+
let PeerSetResponse::StemPeer(stem_peer) = res else {
59+
unreachable!()
60+
};
61+
62+
match stem_peer {
63+
Some(peer) => OutboundPeer::Peer(
64+
CrossNetworkInternalPeerId::ClearNet(peer.info.id),
65+
StemPeerService(peer),
66+
),
67+
None => OutboundPeer::Exhausted,
68+
}
69+
})));
70+
}
71+
}
72+
}
4373
}
4474
}
4575

76+
enum OutboundPeerStreamState {
77+
Standby,
78+
AwaitingPeer(BoxFuture<'static, Result<PeerSetResponse<ClearNet>, tower::BoxError>>),
79+
}
80+
4681
/// The stem service, used to send stem txs.
47-
pub struct StemPeerService<N: NetworkZone>(ClientPoolDropGuard<N>);
82+
pub struct StemPeerService<N: NetworkZone>(ClientDropGuard<N>);
4883

4984
impl<N: NetworkZone> Service<StemRequest<DandelionTx>> for StemPeerService<N> {
5085
type Response = <Client<N> as Service<PeerRequest>>::Response;

p2p/p2p/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ rayon = { workspace = true }
2424
tokio-stream = { workspace = true, features = ["sync", "time"] }
2525
futures = { workspace = true, features = ["std"] }
2626
pin-project = { workspace = true }
27-
indexmap = { workspace = true }
27+
indexmap = { workspace = true, features = ["std"] }
2828

2929
thiserror = { workspace = true }
3030
bytes = { workspace = true, features = ["std"] }

p2p/p2p/src/block_downloader/tests.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ proptest! {
4949

5050
let tokio_pool = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
5151

52-
#[expect(clippy::significant_drop_tightening)]
5352
tokio_pool.block_on(async move {
5453
timeout(Duration::from_secs(600), async move {
5554
let (new_connection_tx, new_connection_rx) = mpsc::channel(peers);

p2p/p2p/src/lib.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,10 @@ impl<N: NetworkZone> NetworkInterface<N> {
176176
self.address_book.clone()
177177
}
178178

179-
// Borrows the `ClientPool`, for access to connected peers.
180-
// pub const fn client_pool(&self) -> &Arc<ClientPool<N>> {
181-
// &self.pool
182-
//}
179+
/// Borrows the `ClientPool`, for access to connected peers.
180+
pub fn peer_set(
181+
&mut self,
182+
) -> &mut BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError> {
183+
&mut self.peer_set
184+
}
183185
}

p2p/p2p/src/peer_set.rs

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use cuprate_p2p_core::client::{Client, InternalPeerID};
2-
use cuprate_p2p_core::NetworkZone;
3-
use indexmap::IndexMap;
2+
use cuprate_p2p_core::{ConnectionDirection, NetworkZone};
3+
use indexmap::{IndexMap, IndexSet};
4+
use rand::seq::index;
5+
use rand::thread_rng;
46
use std::future::{ready, Ready};
57
use std::task::{Context, Poll};
68
use tokio::sync::mpsc;
@@ -15,6 +17,7 @@ use cuprate_helper::cast::u64_to_usize;
1517
pub enum PeerSetRequest {
1618
MostPoWSeen,
1719
PeersWithMorePoW(u128),
20+
StemPeer,
1821
}
1922

2023
pub enum PeerSetResponse<N: NetworkZone> {
@@ -24,23 +27,30 @@ pub enum PeerSetResponse<N: NetworkZone> {
2427
top_hash: [u8; 32],
2528
},
2629
PeersWithMorePoW(Vec<ClientDropGuard<N>>),
30+
StemPeer(Option<ClientDropGuard<N>>),
2731
}
2832

2933
pub(crate) struct PeerSet<N: NetworkZone> {
3034
peers: IndexMap<InternalPeerID<N::Addr>, StoredClient<N>>,
35+
outbound_peers: IndexSet<InternalPeerID<N::Addr>>,
3136
new_peers: mpsc::Receiver<Client<N>>,
3237
}
3338

3439
impl<N: NetworkZone> PeerSet<N> {
3540
pub(crate) fn new(new_peers: mpsc::Receiver<Client<N>>) -> Self {
3641
Self {
3742
peers: IndexMap::new(),
43+
outbound_peers: IndexSet::new(),
3844
new_peers,
3945
}
4046
}
4147

4248
fn poll_new_peers(&mut self, cx: &mut Context<'_>) {
4349
while let Poll::Ready(Some(new_peer)) = self.new_peers.poll_recv(cx) {
50+
if new_peer.info.direction == ConnectionDirection::Outbound {
51+
self.outbound_peers.insert(new_peer.info.id);
52+
}
53+
4454
self.peers
4555
.insert(new_peer.info.id, StoredClient::new(new_peer));
4656
}
@@ -49,9 +59,14 @@ impl<N: NetworkZone> PeerSet<N> {
4959
fn remove_dead_peers(&mut self) {
5060
let mut i = 0;
5161
while i < self.peers.len() {
52-
if self.peers[i].client.alive() {
62+
let peer = &self.peers[i];
63+
if peer.client.alive() {
5364
i += 1;
5465
} else {
66+
if peer.client.info.direction == ConnectionDirection::Outbound {
67+
self.outbound_peers.swap_remove(&peer.client.info.id);
68+
}
69+
5570
self.peers.swap_remove_index(i);
5671
}
5772
}
@@ -83,21 +98,43 @@ impl<N: NetworkZone> PeerSet<N> {
8398
PeerSetResponse::PeersWithMorePoW(
8499
self.peers
85100
.values()
86-
.filter_map(|client| {
87-
(!client.is_downloading_blocks()
101+
.filter(|&client| {
102+
!client.is_downloading_blocks()
88103
&& client
89104
.client
90105
.info
91106
.core_sync_data
92107
.lock()
93108
.unwrap()
94109
.cumulative_difficulty()
95-
> cumulative_difficulty)
96-
.then(|| client.downloading_blocks_guard())
110+
> cumulative_difficulty
97111
})
112+
.map(StoredClient::downloading_blocks_guard)
98113
.collect(),
99114
)
100115
}
116+
117+
fn random_peer_for_stem(&self) -> PeerSetResponse<N> {
118+
let outbound_peers = index::sample(
119+
&mut thread_rng(),
120+
self.outbound_peers.len(),
121+
self.outbound_peers.len(),
122+
);
123+
124+
for peer in outbound_peers
125+
.into_iter()
126+
.map(|i| self.outbound_peers.get_index(i).unwrap())
127+
{
128+
let client = self.peers.get(peer).unwrap();
129+
if client.is_a_stem_peer() {
130+
continue;
131+
}
132+
133+
return PeerSetResponse::StemPeer(Some(client.stem_peer_guard()));
134+
}
135+
136+
PeerSetResponse::StemPeer(None)
137+
}
101138
}
102139

103140
impl<N: NetworkZone> Service<PeerSetRequest> for PeerSet<N> {
@@ -120,6 +157,7 @@ impl<N: NetworkZone> Service<PeerSetRequest> for PeerSet<N> {
120157
PeerSetRequest::PeersWithMorePoW(cumulative_difficulty) => {
121158
Ok(self.peers_with_more_pow(cumulative_difficulty))
122159
}
160+
PeerSetRequest::StemPeer => Ok(self.random_peer_for_stem()),
123161
})
124162
}
125163
}

p2p/p2p/src/peer_set/client_wrappers.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,22 +30,23 @@ impl<N: NetworkZone> StoredClient<N> {
3030
self.downloading_blocks.load(Ordering::Relaxed)
3131
}
3232

33+
pub(super) fn is_a_stem_peer(&self) -> bool {
34+
self.stem_peer.load(Ordering::Relaxed)
35+
}
36+
3337
pub(super) fn downloading_blocks_guard(&self) -> ClientDropGuard<N> {
3438
ClientDropGuard {
3539
client: self.client.downgrade(),
3640
bool: Arc::clone(&self.downloading_blocks),
3741
}
3842
}
3943

40-
/*
4144
pub(super) fn stem_peer_guard(&self) -> ClientDropGuard<N> {
4245
ClientDropGuard {
4346
client: self.client.downgrade(),
4447
bool: Arc::clone(&self.stem_peer),
4548
}
4649
}
47-
48-
*/
4950
}
5051

5152
pub struct ClientDropGuard<N: NetworkZone> {

0 commit comments

Comments
 (0)