Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 16 additions & 11 deletions binaries/cuprated/src/blockchain/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tracing::instrument;
use cuprate_consensus::{BlockChainContext, BlockChainContextRequest, BlockChainContextResponse};
use cuprate_p2p::{
block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse},
NetworkInterface,
NetworkInterface, PeerSetRequest, PeerSetResponse,
};
use cuprate_p2p_core::ClearNet;

Expand All @@ -28,15 +28,11 @@ pub enum SyncerError {
}

/// The syncer tasks that makes sure we are fully synchronised with our connected peers.
#[expect(
clippy::significant_drop_tightening,
reason = "Client pool which will be removed"
)]
#[instrument(level = "debug", skip_all)]
pub async fn syncer<C, CN>(
mut context_svc: C,
our_chain: CN,
clearnet_interface: NetworkInterface<ClearNet>,
mut clearnet_interface: NetworkInterface<ClearNet>,
incoming_block_batch_tx: mpsc::Sender<BlockBatch>,
stop_current_block_downloader: Arc<Notify>,
block_downloader_config: BlockDownloaderConfig,
Expand Down Expand Up @@ -67,8 +63,6 @@ where
unreachable!();
};

let client_pool = clearnet_interface.client_pool();

tracing::debug!("Waiting for new sync info in top sync channel");

loop {
Expand All @@ -79,9 +73,20 @@ where
check_update_blockchain_context(&mut context_svc, &mut blockchain_ctx).await?;
let raw_blockchain_context = blockchain_ctx.unchecked_blockchain_context();

if !client_pool.contains_client_with_more_cumulative_difficulty(
raw_blockchain_context.cumulative_difficulty,
) {
let PeerSetResponse::MostPoWSeen {
cumulative_difficulty,
..
} = clearnet_interface
.peer_set()
.ready()
.await?
.call(PeerSetRequest::MostPoWSeen)
.await?
else {
unreachable!();
};

if cumulative_difficulty <= raw_blockchain_context.cumulative_difficulty {
continue;
}

Expand Down
2 changes: 1 addition & 1 deletion binaries/cuprated/src/txpool/dandelion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub fn dandelion_router(clear_net: NetworkInterface<ClearNet>) -> ConcreteDandel
diffuse_service::DiffuseService {
clear_net_broadcast_service: clear_net.broadcast_svc(),
},
stem_service::OutboundPeerStream { clear_net },
stem_service::OutboundPeerStream::new(clear_net),
DANDELION_CONFIG,
)
}
71 changes: 54 additions & 17 deletions binaries/cuprated/src/txpool/dandelion/stem_service.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};

use bytes::Bytes;
use futures::Stream;
use futures::{future::BoxFuture, FutureExt, Stream};
use tower::Service;

use cuprate_dandelion_tower::{traits::StemRequest, OutboundPeer};
use cuprate_p2p::{ClientPoolDropGuard, NetworkInterface};
use cuprate_p2p::{ClientDropGuard, NetworkInterface, PeerSetRequest, PeerSetResponse};
use cuprate_p2p_core::{
client::{Client, InternalPeerID},
ClearNet, NetworkZone, PeerRequest, ProtocolRequest,
Expand All @@ -19,7 +20,17 @@ use crate::{p2p::CrossNetworkInternalPeerId, txpool::dandelion::DandelionTx};

/// The dandelion outbound peer stream.
pub struct OutboundPeerStream {
pub clear_net: NetworkInterface<ClearNet>,
clear_net: NetworkInterface<ClearNet>,
state: OutboundPeerStreamState,
}

impl OutboundPeerStream {
pub const fn new(clear_net: NetworkInterface<ClearNet>) -> Self {
Self {
clear_net,
state: OutboundPeerStreamState::Standby,
}
}
}

impl Stream for OutboundPeerStream {
Expand All @@ -28,23 +39,49 @@ impl Stream for OutboundPeerStream {
tower::BoxError,
>;

fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// TODO: make the outbound peer choice random.
Poll::Ready(Some(Ok(self
.clear_net
.client_pool()
.outbound_client()
.map_or(OutboundPeer::Exhausted, |client| {
OutboundPeer::Peer(
CrossNetworkInternalPeerId::ClearNet(client.info.id),
StemPeerService(client),
)
}))))
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match &mut self.state {
OutboundPeerStreamState::Standby => {
let peer_set = self.clear_net.peer_set();
let res = ready!(peer_set.poll_ready(cx));

self.state = OutboundPeerStreamState::AwaitingPeer(
peer_set.call(PeerSetRequest::StemPeer).boxed(),
);
}
OutboundPeerStreamState::AwaitingPeer(fut) => {
let res = ready!(fut.poll_unpin(cx));

return Poll::Ready(Some(res.map(|res| {
let PeerSetResponse::StemPeer(stem_peer) = res else {
unreachable!()
};

match stem_peer {
Some(peer) => OutboundPeer::Peer(
CrossNetworkInternalPeerId::ClearNet(peer.info.id),
StemPeerService(peer),
),
None => OutboundPeer::Exhausted,
}
})));
}
}
}
}
}

/// The state of the [`OutboundPeerStream`].
enum OutboundPeerStreamState {
/// Standby state.
Standby,
/// Awaiting a response from the peer-set.
AwaitingPeer(BoxFuture<'static, Result<PeerSetResponse<ClearNet>, tower::BoxError>>),
}

/// The stem service, used to send stem txs.
pub struct StemPeerService<N: NetworkZone>(ClientPoolDropGuard<N>);
pub struct StemPeerService<N: NetworkZone>(ClientDropGuard<N>);

impl<N: NetworkZone> Service<StemRequest<DandelionTx>> for StemPeerService<N> {
type Response = <Client<N> as Service<PeerRequest>>::Response;
Expand Down
13 changes: 13 additions & 0 deletions p2p/p2p-core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ mod connector;
pub mod handshaker;
mod request_handler;
mod timeout_monitor;
mod weak;

pub use connector::{ConnectRequest, Connector};
pub use handshaker::{DoHandshakeRequest, HandshakeError, HandshakerBuilder};
pub use weak::WeakClient;

/// An internal identifier for a given peer, will be their address if known
/// or a random u128 if not.
Expand Down Expand Up @@ -128,6 +130,17 @@ impl<Z: NetworkZone> Client<Z> {
}
.into()
}

/// Create a [`WeakClient`] for this [`Client`].
pub fn downgrade(&self) -> WeakClient<Z> {
WeakClient {
info: self.info.clone(),
connection_tx: self.connection_tx.downgrade(),
semaphore: self.semaphore.clone(),
permit: None,
error: self.error.clone(),
}
}
}

impl<Z: NetworkZone> Service<PeerRequest> for Client<Z> {
Expand Down
114 changes: 114 additions & 0 deletions p2p/p2p-core/src/client/weak.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use std::task::{ready, Context, Poll};

use futures::channel::oneshot;
use tokio::sync::{mpsc, OwnedSemaphorePermit};
use tokio_util::sync::PollSemaphore;
use tower::Service;

use cuprate_helper::asynch::InfallibleOneshotReceiver;

use crate::{
client::{connection, PeerInformation},
NetworkZone, PeerError, PeerRequest, PeerResponse, SharedError,
};

/// A weak handle to a [`Client`](super::Client).
///
/// When this is dropped the peer will not be disconnected.
pub struct WeakClient<N: NetworkZone> {
/// Information on the connected peer.
pub info: PeerInformation<N::Addr>,

/// The channel to the [`Connection`](connection::Connection) task.
pub(super) connection_tx: mpsc::WeakSender<connection::ConnectionTaskRequest>,

/// The semaphore that limits the requests sent to the peer.
pub(super) semaphore: PollSemaphore,
/// A permit for the semaphore, will be [`Some`] after `poll_ready` returns ready.
pub(super) permit: Option<OwnedSemaphorePermit>,

/// The error slot shared between the [`Client`] and [`Connection`](connection::Connection).
pub(super) error: SharedError<PeerError>,
}

impl<N: NetworkZone> WeakClient<N> {
/// Internal function to set an error on the [`SharedError`].
fn set_err(&self, err: PeerError) -> tower::BoxError {
let err_str = err.to_string();
match self.error.try_insert_err(err) {
Ok(()) => err_str,
Err(e) => e.to_string(),
}
.into()
}
}

impl<Z: NetworkZone> Service<PeerRequest> for WeakClient<Z> {
type Response = PeerResponse;
type Error = tower::BoxError;
type Future = InfallibleOneshotReceiver<Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if let Some(err) = self.error.try_get_err() {
return Poll::Ready(Err(err.to_string().into()));
}

if self.connection_tx.strong_count() == 0 {
let err = self.set_err(PeerError::ClientChannelClosed);
return Poll::Ready(Err(err));
}

if self.permit.is_some() {
return Poll::Ready(Ok(()));
}

let permit = ready!(self.semaphore.poll_acquire(cx))
.expect("Client semaphore should not be closed!");

self.permit = Some(permit);

Poll::Ready(Ok(()))
}

#[expect(clippy::significant_drop_tightening)]
fn call(&mut self, request: PeerRequest) -> Self::Future {
let permit = self
.permit
.take()
.expect("poll_ready did not return ready before call to call");

let (tx, rx) = oneshot::channel();
let req = connection::ConnectionTaskRequest {
response_channel: tx,
request,
permit: Some(permit),
};

match self.connection_tx.upgrade() {
None => {
self.set_err(PeerError::ClientChannelClosed);

let resp = Err(PeerError::ClientChannelClosed.into());
drop(req.response_channel.send(resp));
}
Some(sender) => {
if let Err(e) = sender.try_send(req) {
// The connection task could have closed between a call to `poll_ready` and the call to
// `call`, which means if we don't handle the error here the receiver would panic.
use mpsc::error::TrySendError;

match e {
TrySendError::Closed(req) | TrySendError::Full(req) => {
self.set_err(PeerError::ClientChannelClosed);

let resp = Err(PeerError::ClientChannelClosed.into());
drop(req.response_channel.send(resp));
}
}
}
}
}

rx.into()
}
}
4 changes: 2 additions & 2 deletions p2p/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ monero-serai = { workspace = true, features = ["std"] }

tower = { workspace = true, features = ["buffer"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
rayon = { workspace = true }
tokio-util = { workspace = true }
rayon = { workspace = true }
tokio-stream = { workspace = true, features = ["sync", "time"] }
futures = { workspace = true, features = ["std"] }
pin-project = { workspace = true }
dashmap = { workspace = true }
indexmap = { workspace = true, features = ["std"] }

thiserror = { workspace = true }
bytes = { workspace = true, features = ["std"] }
Expand Down
Loading
Loading