44//! them to the handshaker service and then adds them to the client pool.
55use std:: { pin:: pin, sync:: Arc } ;
66
7- use futures:: StreamExt ;
7+ use futures:: { SinkExt , StreamExt } ;
88use tokio:: {
99 sync:: Semaphore ,
10+ task:: JoinSet ,
1011 time:: { sleep, timeout} ,
1112} ;
1213use tower:: { Service , ServiceExt } ;
@@ -17,14 +18,22 @@ use cuprate_p2p_core::{
1718 services:: { AddressBookRequest , AddressBookResponse } ,
1819 AddressBook , ConnectionDirection , NetworkZone ,
1920} ;
21+ use cuprate_wire:: {
22+ admin:: { PingResponse , PING_OK_RESPONSE_STATUS_TEXT } ,
23+ AdminRequestMessage , AdminResponseMessage , BucketError , Message ,
24+ } ;
2025
2126use crate :: {
2227 client_pool:: ClientPool ,
23- constants:: { HANDSHAKE_TIMEOUT , INBOUND_CONNECTION_COOL_DOWN } ,
28+ constants:: {
29+ HANDSHAKE_TIMEOUT , INBOUND_CONNECTION_COOL_DOWN , PING_REQUEST_CONCURRENCY ,
30+ PING_REQUEST_TIMEOUT ,
31+ } ,
2432 P2PConfig ,
2533} ;
2634
27- /// Starts the inbound server.
35+ /// Starts the inbound server. This function will listen to all incoming connections
36+ /// and initiate handshake if needed, after verifying the address isn't banned.
2837#[ instrument( level = "warn" , skip_all) ]
2938pub async fn inbound_server < N , HS , A > (
3039 client_pool : Arc < ClientPool < N > > ,
4049 HS :: Future : Send + ' static ,
4150 A : AddressBook < N > ,
4251{
52+ // Copying the peer_id before borrowing for ping responses (Make us avoid a `clone()`).
53+ let our_peer_id = config. basic_node_data ( ) . peer_id ;
54+
55+ // Mandatory. Extract server config from P2PConfig
4356 let Some ( server_config) = config. server_config else {
4457 tracing:: warn!( "No inbound server config provided, not listening for inbound connections." ) ;
4558 return Ok ( ( ) ) ;
@@ -53,13 +66,18 @@ where
5366
5467 let mut listener = pin ! ( listener) ;
5568
69+ // Create semaphore for limiting to maximum inbound connections.
5670 let semaphore = Arc :: new ( Semaphore :: new ( config. max_inbound_connections ) ) ;
71+ // Create ping request handling JoinSet
72+ let mut ping_join_set = JoinSet :: new ( ) ;
5773
74+ // Listen to incoming connections and extract necessary information.
5875 while let Some ( connection) = listener. next ( ) . await {
59- let Ok ( ( addr, peer_stream, peer_sink) ) = connection else {
76+ let Ok ( ( addr, mut peer_stream, mut peer_sink) ) = connection else {
6077 continue ;
6178 } ;
6279
80+ // If peer is banned, drop connection
6381 if let Some ( addr) = & addr {
6482 let AddressBookResponse :: IsPeerBanned ( banned) = address_book
6583 . ready ( )
@@ -75,11 +93,13 @@ where
7593 }
7694 }
7795
96+ // Create a new internal id for new peers
7897 let addr = match addr {
7998 Some ( addr) => InternalPeerID :: KnownAddr ( addr) ,
8099 None => InternalPeerID :: Unknown ( rand:: random ( ) ) ,
81100 } ;
82101
102+ // If we're still behind our maximum limit, Initiate handshake.
83103 if let Ok ( permit) = semaphore. clone ( ) . try_acquire_owned ( ) {
84104 tracing:: debug!( "Permit free for incoming connection, attempting handshake." ) ;
85105
@@ -102,8 +122,34 @@ where
102122 . instrument ( Span :: current ( ) ) ,
103123 ) ;
104124 } else {
125+ // Otherwise check if the node is simply pinging us.
105126 tracing:: debug!( "No permit free for incoming connection." ) ;
106- // TODO: listen for if the peer is just trying to ping us to see if we are reachable.
127+
128+ // We only handle 2 ping request conccurently. Otherwise we drop the connection immediately.
129+ if ping_join_set. len ( ) < PING_REQUEST_CONCURRENCY {
130+ ping_join_set. spawn (
131+ async move {
132+ // Await first message from node. If it is a ping request we respond back, otherwise we drop the connection.
133+ let fut = timeout ( PING_REQUEST_TIMEOUT , peer_stream. next ( ) ) ;
134+
135+ // Ok if timeout did not elapsed -> Some if there is a message -> Ok if it has been decoded
136+ if let Ok ( Some ( Ok ( Message :: Request ( AdminRequestMessage :: Ping ) ) ) ) = fut. await {
137+ let response = peer_sink
138+ . send ( Message :: Response ( AdminResponseMessage :: Ping ( PingResponse {
139+ status : PING_OK_RESPONSE_STATUS_TEXT ,
140+ peer_id : our_peer_id
141+ }
142+ ) ) . into ( ) )
143+ . await ;
144+
145+ if let Err ( err) = response {
146+ tracing:: debug!( "Unable to respond to ping request from peer ({addr}): {err}" )
147+ }
148+ }
149+ }
150+ . instrument ( Span :: current ( ) )
151+ ) ;
152+ }
107153 }
108154
109155 sleep ( INBOUND_CONNECTION_COOL_DOWN ) . await ;
0 commit comments