Skip to content

Commit fc14879

Browse files
committed
Stricter dis/connect_nodes
1 parent 1193cbc commit fc14879

File tree

1 file changed

+118
-11
lines changed

1 file changed

+118
-11
lines changed

src/bitcoin.rs

Lines changed: 118 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::{
22
collections::HashSet,
33
fs::File,
4+
future::Future,
45
process::Stdio,
56
sync::Arc,
67
time::{Duration, Instant},
@@ -308,7 +309,7 @@ impl BitcoinNodeCluster {
308309

309310
pub async fn wait_for_sync(&self, timeout: Option<Duration>) -> Result<()> {
310311
let start = Instant::now();
311-
let timeout = timeout.unwrap_or(Duration::from_secs(30));
312+
let timeout = timeout.unwrap_or(Duration::from_secs(60));
312313
while start.elapsed() < timeout {
313314
let mut heights = HashSet::new();
314315
for node in &self.inner {
@@ -334,9 +335,53 @@ impl BitcoinNodeCluster {
334335
SpawnOutput::Container(container) => container.ip.clone(),
335336
SpawnOutput::Child(_) => "127.0.0.1".to_string(),
336337
};
338+
let ip_port = format!("{}:{}", ip, to_node.config.p2p_port);
339+
340+
from_node.onetry_node(&ip_port).await?;
341+
342+
let from_subver = from_node.get_network_info().await?.subversion;
343+
let to_subver = to_node.get_network_info().await?.subversion;
344+
345+
// Check and wait for both inbound and outbound connections
346+
wait_until(|| async {
347+
let out_connected = from_node
348+
.get_peer_info()
349+
.await?
350+
.iter()
351+
.any(|peer| peer.subver == to_subver && !peer.inbound);
352+
353+
let in_connected = to_node
354+
.get_peer_info()
355+
.await?
356+
.iter()
357+
.any(|peer| peer.subver == from_subver && peer.inbound);
358+
359+
Ok(out_connected && in_connected)
360+
})
361+
.await?;
337362

338-
let target_node_addr = format!("{}:{}", ip, to_node.config.p2p_port);
339-
from_node.onetry_node(&target_node_addr).await?;
363+
// Handshake check. Wait for pong messages
364+
wait_until(|| async {
365+
let out_peer = from_node
366+
.get_peer_info()
367+
.await?
368+
.into_iter()
369+
.find(|peer| peer.subver == to_subver && !peer.inbound);
370+
371+
let in_peer = to_node
372+
.get_peer_info()
373+
.await?
374+
.into_iter()
375+
.find(|peer| peer.subver == from_subver && peer.inbound);
376+
377+
if let (Some(out_p), Some(in_p)) = (out_peer, in_peer) {
378+
Ok(out_p.bytesrecv_per_msg.get("pong").unwrap_or(&0) >= &29u64
379+
&& in_p.bytesrecv_per_msg.get("pong").unwrap_or(&0) >= &29u64)
380+
} else {
381+
Ok(false)
382+
}
383+
})
384+
.await?;
340385
}
341386
}
342387
}
@@ -347,20 +392,54 @@ impl BitcoinNodeCluster {
347392
for (i, from_node) in self.iter().enumerate() {
348393
for (j, to_node) in self.iter().enumerate() {
349394
if i != j {
350-
let ip = match &to_node.spawn_output {
351-
SpawnOutput::Container(container) => container.ip.clone(),
352-
SpawnOutput::Child(_) => "127.0.0.1".to_string(),
353-
};
354-
355-
let target_node_addr = format!("{}:{}", ip, to_node.config.p2p_port);
356-
// from_node.remove_node(&target_node_addr).await?;
357-
from_node.disconnect_node(&target_node_addr).await?;
395+
let to_subver = to_node.get_network_info().await?.subversion;
396+
397+
let peers = from_node.get_peer_info().await?;
398+
let peer_ids: Vec<_> = peers
399+
.iter()
400+
.filter(|peer| peer.subver == to_subver)
401+
.map(|peer| peer.id)
402+
.collect();
403+
404+
if peer_ids.is_empty() {
405+
return Ok(());
406+
}
407+
408+
for peer_id in peer_ids {
409+
match from_node.disconnect_node_by_id(peer_id as u32).await {
410+
Ok(_) => (),
411+
Err(e) => {
412+
if !e.to_string().contains("Node not found") {
413+
bail!("{e}")
414+
}
415+
}
416+
}
417+
}
418+
419+
wait_until(|| self.test_connection(from_node, to_node, false)).await?;
420+
wait_until(|| self.test_connection(to_node, from_node, false)).await?;
358421
}
359422
}
360423
}
361424
Ok(())
362425
}
363426

427+
async fn test_connection(
428+
&self,
429+
from_node: &BitcoinNode,
430+
to_node: &BitcoinNode,
431+
expect_connected: bool,
432+
) -> Result<bool> {
433+
// Get subversion for identification
434+
let to_subver = to_node.get_network_info().await?.subversion;
435+
436+
// Check if peer exists with matching subversion
437+
let peers = from_node.get_peer_info().await?;
438+
let is_connected = peers.iter().any(|peer| peer.subver == to_subver);
439+
440+
Ok(is_connected == expect_connected)
441+
}
442+
364443
pub fn get(&self, index: usize) -> Option<&BitcoinNode> {
365444
self.inner.get(index)
366445
}
@@ -389,3 +468,31 @@ async fn wait_for_rpc_ready(client: &Client, timeout: Option<Duration>) -> Resul
389468
}
390469
bail!("Timeout waiting for RPC to be ready")
391470
}
471+
472+
pub async fn wait_until<F, Fut>(mut f: F) -> Result<()>
473+
where
474+
F: FnMut() -> Fut,
475+
Fut: Future<Output = Result<bool>>,
476+
{
477+
let timeout = Duration::from_secs(60);
478+
let start = Instant::now();
479+
480+
while start.elapsed() < timeout {
481+
match f().await {
482+
Ok(true) => return Ok(()),
483+
Ok(false) => {
484+
sleep(Duration::from_secs(1)).await;
485+
continue;
486+
}
487+
Err(e) => {
488+
if e.to_string().contains("node not connected") {
489+
sleep(Duration::from_secs(1)).await;
490+
continue;
491+
}
492+
return Err(e);
493+
}
494+
}
495+
}
496+
497+
bail!("wait_until timed out after {} seconds", timeout.as_secs())
498+
}

0 commit comments

Comments
 (0)