4
4
//! them to the handshaker service and then adds them to the client pool.
5
5
use std:: { pin:: pin, sync:: Arc } ;
6
6
7
- use futures:: { StreamExt , SinkExt } ;
7
+ use futures:: { SinkExt , StreamExt } ;
8
8
use tokio:: {
9
- sync:: Semaphore , task:: JoinSet , time:: { sleep, timeout}
9
+ sync:: Semaphore ,
10
+ task:: JoinSet ,
11
+ time:: { sleep, timeout} ,
10
12
} ;
11
13
use tower:: { Service , ServiceExt } ;
12
14
use tracing:: { instrument, Instrument , Span } ;
13
15
14
- use cuprate_wire:: {
15
- admin:: { PingResponse , PING_OK_RESPONSE_STATUS_TEXT } ,
16
- AdminRequestMessage , AdminResponseMessage , BucketError , Message
17
- } ;
18
16
use cuprate_p2p_core:: {
19
17
client:: { Client , DoHandshakeRequest , HandshakeError , InternalPeerID } ,
20
18
services:: { AddressBookRequest , AddressBookResponse } ,
21
19
AddressBook , ConnectionDirection , NetworkZone ,
22
20
} ;
21
+ use cuprate_wire:: {
22
+ admin:: { PingResponse , PING_OK_RESPONSE_STATUS_TEXT } ,
23
+ AdminRequestMessage , AdminResponseMessage , BucketError , Message ,
24
+ } ;
23
25
24
26
use crate :: {
25
27
client_pool:: ClientPool ,
26
- constants:: { HANDSHAKE_TIMEOUT , INBOUND_CONNECTION_COOL_DOWN , PING_REQUEST_CONCURRENCY , PING_REQUEST_TIMEOUT } ,
28
+ constants:: {
29
+ HANDSHAKE_TIMEOUT , INBOUND_CONNECTION_COOL_DOWN , PING_REQUEST_CONCURRENCY ,
30
+ PING_REQUEST_TIMEOUT ,
31
+ } ,
27
32
P2PConfig ,
28
33
} ;
29
34
46
51
{
47
52
// Copying the peer_id before borrowing for ping responses (Make us avoid a `clone()`).
48
53
let our_peer_id = config. basic_node_data ( ) . peer_id ;
49
-
54
+
50
55
// Mandatory. Extract server config from P2PConfig
51
56
let Some ( server_config) = config. server_config else {
52
57
tracing:: warn!( "No inbound server config provided, not listening for inbound connections." ) ;
61
66
62
67
let mut listener = pin ! ( listener) ;
63
68
64
- // Create semaphore for limiting to maximum inbound connections.
69
+ // Create semaphore for limiting to maximum inbound connections.
65
70
let semaphore = Arc :: new ( Semaphore :: new ( config. max_inbound_connections ) ) ;
66
71
// Create ping request handling JoinSet
67
72
let mut ping_join_set = JoinSet :: new ( ) ;
94
99
None => InternalPeerID :: Unknown ( rand:: random ( ) ) ,
95
100
} ;
96
101
97
- // If we're still behind our maximum limit, Initiate handshake.
102
+ // If we're still behind our maximum limit, Initiate handshake.
98
103
if let Ok ( permit) = semaphore. clone ( ) . try_acquire_owned ( ) {
99
104
tracing:: debug!( "Permit free for incoming connection, attempting handshake." ) ;
100
105
@@ -119,23 +124,19 @@ where
119
124
} else {
120
125
// Otherwise check if the node is simply pinging us.
121
126
tracing:: debug!( "No permit free for incoming connection." ) ;
122
-
127
+
123
128
// We only handle 2 ping request conccurently. Otherwise we drop the connection immediately.
124
129
if ping_join_set. len ( ) < PING_REQUEST_CONCURRENCY {
125
130
ping_join_set. spawn (
126
131
async move {
127
132
// Await first message from node. If it is a ping request we respond back, otherwise we drop the connection.
128
133
let fut = timeout ( PING_REQUEST_TIMEOUT , peer_stream. next ( ) ) ;
129
-
130
134
#[ allow( clippy:: collapsible_match) ]
131
135
// Ok if timeout did not elapsed -> Some if there is a message -> Ok if it has been decoded
132
136
if let Ok ( Some ( Result :: < Message , BucketError > :: Ok ( command) ) ) = fut. await {
133
-
134
137
if let Message :: Request ( AdminRequestMessage :: Ping ) = command {
135
- let _ = peer_sink
136
- . send ( Message :: Response ( AdminResponseMessage :: Ping (
137
- PingResponse {
138
- status : PING_OK_RESPONSE_STATUS_TEXT ,
138
+ let _ = peer_sink. send ( Message :: Response ( AdminResponseMessage :: Ping ( PingResponse {
139
+ status : PING_OK_RESPONSE_STATUS_TEXT ,
139
140
peer_id : our_peer_id
140
141
}
141
142
) ) . into ( ) )
0 commit comments