1
1
use std:: {
2
2
collections:: HashSet ,
3
3
fs:: File ,
4
+ future:: Future ,
4
5
process:: Stdio ,
5
6
sync:: Arc ,
6
7
time:: { Duration , Instant } ,
@@ -308,7 +309,7 @@ impl BitcoinNodeCluster {
308
309
309
310
pub async fn wait_for_sync ( & self , timeout : Option < Duration > ) -> Result < ( ) > {
310
311
let start = Instant :: now ( ) ;
311
- let timeout = timeout. unwrap_or ( Duration :: from_secs ( 30 ) ) ;
312
+ let timeout = timeout. unwrap_or ( Duration :: from_secs ( 60 ) ) ;
312
313
while start. elapsed ( ) < timeout {
313
314
let mut heights = HashSet :: new ( ) ;
314
315
for node in & self . inner {
@@ -334,9 +335,53 @@ impl BitcoinNodeCluster {
334
335
SpawnOutput :: Container ( container) => container. ip . clone ( ) ,
335
336
SpawnOutput :: Child ( _) => "127.0.0.1" . to_string ( ) ,
336
337
} ;
338
+ let ip_port = format ! ( "{}:{}" , ip, to_node. config. p2p_port) ;
339
+
340
+ from_node. onetry_node ( & ip_port) . await ?;
341
+
342
+ let from_subver = from_node. get_network_info ( ) . await ?. subversion ;
343
+ let to_subver = to_node. get_network_info ( ) . await ?. subversion ;
344
+
345
+ // Check and wait for both inbound and outbound connections
346
+ wait_until ( || async {
347
+ let out_connected = from_node
348
+ . get_peer_info ( )
349
+ . await ?
350
+ . iter ( )
351
+ . any ( |peer| peer. subver == to_subver && !peer. inbound ) ;
352
+
353
+ let in_connected = to_node
354
+ . get_peer_info ( )
355
+ . await ?
356
+ . iter ( )
357
+ . any ( |peer| peer. subver == from_subver && peer. inbound ) ;
358
+
359
+ Ok ( out_connected && in_connected)
360
+ } )
361
+ . await ?;
337
362
338
- let target_node_addr = format ! ( "{}:{}" , ip, to_node. config. p2p_port) ;
339
- from_node. onetry_node ( & target_node_addr) . await ?;
363
+ // Handshake check. Wait for pong messages
364
+ wait_until ( || async {
365
+ let out_peer = from_node
366
+ . get_peer_info ( )
367
+ . await ?
368
+ . into_iter ( )
369
+ . find ( |peer| peer. subver == to_subver && !peer. inbound ) ;
370
+
371
+ let in_peer = to_node
372
+ . get_peer_info ( )
373
+ . await ?
374
+ . into_iter ( )
375
+ . find ( |peer| peer. subver == from_subver && peer. inbound ) ;
376
+
377
+ if let ( Some ( out_p) , Some ( in_p) ) = ( out_peer, in_peer) {
378
+ Ok ( out_p. bytesrecv_per_msg . get ( "pong" ) . unwrap_or ( & 0 ) >= & 29u64
379
+ && in_p. bytesrecv_per_msg . get ( "pong" ) . unwrap_or ( & 0 ) >= & 29u64 )
380
+ } else {
381
+ Ok ( false )
382
+ }
383
+ } )
384
+ . await ?;
340
385
}
341
386
}
342
387
}
@@ -347,20 +392,51 @@ impl BitcoinNodeCluster {
347
392
for ( i, from_node) in self . iter ( ) . enumerate ( ) {
348
393
for ( j, to_node) in self . iter ( ) . enumerate ( ) {
349
394
if i != j {
350
- let ip = match & to_node. spawn_output {
351
- SpawnOutput :: Container ( container) => container. ip . clone ( ) ,
352
- SpawnOutput :: Child ( _) => "127.0.0.1" . to_string ( ) ,
353
- } ;
354
-
355
- let target_node_addr = format ! ( "{}:{}" , ip, to_node. config. p2p_port) ;
356
- // from_node.remove_node(&target_node_addr).await?;
357
- from_node. disconnect_node ( & target_node_addr) . await ?;
395
+ let to_subver = to_node. get_network_info ( ) . await ?. subversion ;
396
+
397
+ let peers = from_node. get_peer_info ( ) . await ?;
398
+ let peer_ids: Vec < _ > = peers
399
+ . iter ( )
400
+ . filter ( |peer| peer. subver == to_subver)
401
+ . map ( |peer| peer. id )
402
+ . collect ( ) ;
403
+
404
+ if peer_ids. is_empty ( ) {
405
+ return Ok ( ( ) ) ;
406
+ }
407
+
408
+ for peer_id in peer_ids {
409
+ match from_node. disconnect_node_by_id ( peer_id as u32 ) . await {
410
+ Ok ( _) => ( ) ,
411
+ Err ( e) => {
412
+ if !e. to_string ( ) . contains ( "Node not found" ) {
413
+ bail ! ( "{e}" )
414
+ }
415
+ }
416
+ }
417
+ }
418
+
419
+ wait_until ( || self . test_connection ( from_node, to_node, false ) ) . await ?;
420
+ wait_until ( || self . test_connection ( to_node, from_node, false ) ) . await ?;
358
421
}
359
422
}
360
423
}
361
424
Ok ( ( ) )
362
425
}
363
426
427
+ async fn test_connection (
428
+ & self ,
429
+ from_node : & BitcoinNode ,
430
+ to_node : & BitcoinNode ,
431
+ expect_connected : bool ,
432
+ ) -> Result < bool > {
433
+ let to_subver = to_node. get_network_info ( ) . await ?. subversion ;
434
+ let peers = from_node. get_peer_info ( ) . await ?;
435
+ let is_connected = peers. iter ( ) . any ( |peer| peer. subver == to_subver) ;
436
+
437
+ Ok ( is_connected == expect_connected)
438
+ }
439
+
364
440
pub fn get ( & self , index : usize ) -> Option < & BitcoinNode > {
365
441
self . inner . get ( index)
366
442
}
@@ -389,3 +465,31 @@ async fn wait_for_rpc_ready(client: &Client, timeout: Option<Duration>) -> Resul
389
465
}
390
466
bail ! ( "Timeout waiting for RPC to be ready" )
391
467
}
468
+
469
+ pub async fn wait_until < F , Fut > ( mut f : F ) -> Result < ( ) >
470
+ where
471
+ F : FnMut ( ) -> Fut ,
472
+ Fut : Future < Output = Result < bool > > ,
473
+ {
474
+ let timeout = Duration :: from_secs ( 60 ) ;
475
+ let start = Instant :: now ( ) ;
476
+
477
+ while start. elapsed ( ) < timeout {
478
+ match f ( ) . await {
479
+ Ok ( true ) => return Ok ( ( ) ) ,
480
+ Ok ( false ) => {
481
+ sleep ( Duration :: from_secs ( 1 ) ) . await ;
482
+ continue ;
483
+ }
484
+ Err ( e) => {
485
+ if e. to_string ( ) . contains ( "node not connected" ) {
486
+ sleep ( Duration :: from_secs ( 1 ) ) . await ;
487
+ continue ;
488
+ }
489
+ return Err ( e) ;
490
+ }
491
+ }
492
+ }
493
+
494
+ bail ! ( "wait_until timed out after {} seconds" , timeout. as_secs( ) )
495
+ }
0 commit comments