Skip to content

Commit 967537f

Browse files
authored
P2P: Implement incoming ping request handling over maximum inbound limit (#277)
Implement incoming ping request handling over maximum inbound limit - If the maximum inbound connection semaphore reach its limit, `inbound_server` fn will open a tokio task to check if the node wanted to ping us. If it is the case we respond, otherwise drop the connection. - Added some documentation to the `inbound_server` fn.
1 parent 0162553 commit 967537f

File tree

2 files changed

+62
-5
lines changed

2 files changed

+62
-5
lines changed

p2p/p2p/src/constants.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@ use std::time::Duration;
33
/// The timeout we set on handshakes.
44
pub(crate) const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(20);
55

6+
/// The timeout we set on receiving ping requests
7+
pub(crate) const PING_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
8+
9+
/// The amount of concurrency (maximum number of simultaneous tasks) we allow for handling ping requests
10+
pub(crate) const PING_REQUEST_CONCURRENCY: usize = 2;
11+
612
/// The maximum amount of connections to make to seed nodes for when we need peers.
713
pub(crate) const MAX_SEED_CONNECTIONS: usize = 3;
814

p2p/p2p/src/inbound_server.rs

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44
//! them to the handshaker service and then adds them to the client pool.
55
use std::{pin::pin, sync::Arc};
66

7-
use futures::StreamExt;
7+
use futures::{SinkExt, StreamExt};
88
use tokio::{
99
sync::Semaphore,
10+
task::JoinSet,
1011
time::{sleep, timeout},
1112
};
1213
use tower::{Service, ServiceExt};
@@ -17,14 +18,22 @@ use cuprate_p2p_core::{
1718
services::{AddressBookRequest, AddressBookResponse},
1819
AddressBook, ConnectionDirection, NetworkZone,
1920
};
21+
use cuprate_wire::{
22+
admin::{PingResponse, PING_OK_RESPONSE_STATUS_TEXT},
23+
AdminRequestMessage, AdminResponseMessage, Message,
24+
};
2025

2126
use crate::{
2227
client_pool::ClientPool,
23-
constants::{HANDSHAKE_TIMEOUT, INBOUND_CONNECTION_COOL_DOWN},
28+
constants::{
29+
HANDSHAKE_TIMEOUT, INBOUND_CONNECTION_COOL_DOWN, PING_REQUEST_CONCURRENCY,
30+
PING_REQUEST_TIMEOUT,
31+
},
2432
P2PConfig,
2533
};
2634

27-
/// Starts the inbound server.
35+
/// Starts the inbound server. This function will listen to all incoming connections
36+
/// and initiate handshake if needed, after verifying the address isn't banned.
2837
#[instrument(level = "warn", skip_all)]
2938
pub async fn inbound_server<N, HS, A>(
3039
client_pool: Arc<ClientPool<N>>,
@@ -40,6 +49,10 @@ where
4049
HS::Future: Send + 'static,
4150
A: AddressBook<N>,
4251
{
52+
// Copying the peer_id before borrowing for ping responses (Make us avoid a `clone()`).
53+
let our_peer_id = config.basic_node_data().peer_id;
54+
55+
// Mandatory. Extract server config from P2PConfig
4356
let Some(server_config) = config.server_config else {
4457
tracing::warn!("No inbound server config provided, not listening for inbound connections.");
4558
return Ok(());
@@ -53,13 +66,18 @@ where
5366

5467
let mut listener = pin!(listener);
5568

69+
// Create semaphore for limiting to maximum inbound connections.
5670
let semaphore = Arc::new(Semaphore::new(config.max_inbound_connections));
71+
// Create ping request handling JoinSet
72+
let mut ping_join_set = JoinSet::new();
5773

74+
// Listen to incoming connections and extract necessary information.
5875
while let Some(connection) = listener.next().await {
59-
let Ok((addr, peer_stream, peer_sink)) = connection else {
76+
let Ok((addr, mut peer_stream, mut peer_sink)) = connection else {
6077
continue;
6178
};
6279

80+
// If peer is banned, drop connection
6381
if let Some(addr) = &addr {
6482
let AddressBookResponse::IsPeerBanned(banned) = address_book
6583
.ready()
@@ -75,11 +93,13 @@ where
7593
}
7694
}
7795

96+
// Create a new internal id for new peers
7897
let addr = match addr {
7998
Some(addr) => InternalPeerID::KnownAddr(addr),
8099
None => InternalPeerID::Unknown(rand::random()),
81100
};
82101

102+
// If we're still behind our maximum limit, Initiate handshake.
83103
if let Ok(permit) = semaphore.clone().try_acquire_owned() {
84104
tracing::debug!("Permit free for incoming connection, attempting handshake.");
85105

@@ -102,8 +122,39 @@ where
102122
.instrument(Span::current()),
103123
);
104124
} else {
125+
// Otherwise check if the node is simply pinging us.
105126
tracing::debug!("No permit free for incoming connection.");
106-
// TODO: listen for if the peer is just trying to ping us to see if we are reachable.
127+
128+
// We only handle 2 ping request conccurently. Otherwise we drop the connection immediately.
129+
if ping_join_set.len() < PING_REQUEST_CONCURRENCY {
130+
ping_join_set.spawn(
131+
async move {
132+
// Await first message from node. If it is a ping request we respond back, otherwise we drop the connection.
133+
let fut = timeout(PING_REQUEST_TIMEOUT, peer_stream.next());
134+
135+
// Ok if timeout did not elapsed -> Some if there is a message -> Ok if it has been decoded
136+
if let Ok(Some(Ok(Message::Request(AdminRequestMessage::Ping)))) = fut.await
137+
{
138+
let response = peer_sink
139+
.send(
140+
Message::Response(AdminResponseMessage::Ping(PingResponse {
141+
status: PING_OK_RESPONSE_STATUS_TEXT,
142+
peer_id: our_peer_id,
143+
}))
144+
.into(),
145+
)
146+
.await;
147+
148+
if let Err(err) = response {
149+
tracing::debug!(
150+
"Unable to respond to ping request from peer ({addr}): {err}"
151+
)
152+
}
153+
}
154+
}
155+
.instrument(Span::current()),
156+
);
157+
}
107158
}
108159

109160
sleep(INBOUND_CONNECTION_COOL_DOWN).await;

0 commit comments

Comments
 (0)