Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 115 additions & 11 deletions src/bitcoin.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
collections::HashSet,
fs::File,
future::Future,
process::Stdio,
sync::Arc,
time::{Duration, Instant},
Expand Down Expand Up @@ -308,7 +309,7 @@ impl BitcoinNodeCluster {

pub async fn wait_for_sync(&self, timeout: Option<Duration>) -> Result<()> {
let start = Instant::now();
let timeout = timeout.unwrap_or(Duration::from_secs(30));
let timeout = timeout.unwrap_or(Duration::from_secs(60));
while start.elapsed() < timeout {
let mut heights = HashSet::new();
for node in &self.inner {
Expand All @@ -334,9 +335,53 @@ impl BitcoinNodeCluster {
SpawnOutput::Container(container) => container.ip.clone(),
SpawnOutput::Child(_) => "127.0.0.1".to_string(),
};
let ip_port = format!("{}:{}", ip, to_node.config.p2p_port);

from_node.onetry_node(&ip_port).await?;

let from_subver = from_node.get_network_info().await?.subversion;
let to_subver = to_node.get_network_info().await?.subversion;

// Check and wait for both inbound and outbound connections
wait_until(|| async {
let out_connected = from_node
.get_peer_info()
.await?
.iter()
.any(|peer| peer.subver == to_subver && !peer.inbound);

let in_connected = to_node
.get_peer_info()
.await?
.iter()
.any(|peer| peer.subver == from_subver && peer.inbound);

Ok(out_connected && in_connected)
})
.await?;

let target_node_addr = format!("{}:{}", ip, to_node.config.p2p_port);
from_node.onetry_node(&target_node_addr).await?;
// Handshake check. Wait for pong messages
wait_until(|| async {
let out_peer = from_node
.get_peer_info()
.await?
.into_iter()
.find(|peer| peer.subver == to_subver && !peer.inbound);

let in_peer = to_node
.get_peer_info()
.await?
.into_iter()
.find(|peer| peer.subver == from_subver && peer.inbound);

if let (Some(out_p), Some(in_p)) = (out_peer, in_peer) {
Ok(out_p.bytesrecv_per_msg.get("pong").unwrap_or(&0) >= &29u64
&& in_p.bytesrecv_per_msg.get("pong").unwrap_or(&0) >= &29u64)
} else {
Ok(false)
}
})
.await?;
}
}
}
Expand All @@ -347,20 +392,51 @@ impl BitcoinNodeCluster {
for (i, from_node) in self.iter().enumerate() {
for (j, to_node) in self.iter().enumerate() {
if i != j {
let ip = match &to_node.spawn_output {
SpawnOutput::Container(container) => container.ip.clone(),
SpawnOutput::Child(_) => "127.0.0.1".to_string(),
};

let target_node_addr = format!("{}:{}", ip, to_node.config.p2p_port);
// from_node.remove_node(&target_node_addr).await?;
from_node.disconnect_node(&target_node_addr).await?;
let to_subver = to_node.get_network_info().await?.subversion;

let peers = from_node.get_peer_info().await?;
let peer_ids: Vec<_> = peers
.iter()
.filter(|peer| peer.subver == to_subver)
.map(|peer| peer.id)
.collect();

if peer_ids.is_empty() {
return Ok(());
}

for peer_id in peer_ids {
match from_node.disconnect_node_by_id(peer_id as u32).await {
Ok(_) => (),
Err(e) => {
if !e.to_string().contains("Node not found") {
bail!("{e}")
}
}
}
}

wait_until(|| self.test_connection(from_node, to_node, false)).await?;
wait_until(|| self.test_connection(to_node, from_node, false)).await?;
}
}
}
Ok(())
}

async fn test_connection(
&self,
from_node: &BitcoinNode,
to_node: &BitcoinNode,
expect_connected: bool,
) -> Result<bool> {
let to_subver = to_node.get_network_info().await?.subversion;
let peers = from_node.get_peer_info().await?;
let is_connected = peers.iter().any(|peer| peer.subver == to_subver);

Ok(is_connected == expect_connected)
}

pub fn get(&self, index: usize) -> Option<&BitcoinNode> {
self.inner.get(index)
}
Expand Down Expand Up @@ -389,3 +465,31 @@ async fn wait_for_rpc_ready(client: &Client, timeout: Option<Duration>) -> Resul
}
bail!("Timeout waiting for RPC to be ready")
}

pub async fn wait_until<F, Fut>(mut f: F) -> Result<()>
where
F: FnMut() -> Fut,
Fut: Future<Output = Result<bool>>,
{
let timeout = Duration::from_secs(60);
let start = Instant::now();

while start.elapsed() < timeout {
match f().await {
Ok(true) => return Ok(()),
Ok(false) => {
sleep(Duration::from_secs(1)).await;
continue;
}
Err(e) => {
if e.to_string().contains("node not connected") {
sleep(Duration::from_secs(1)).await;
continue;
}
return Err(e);
}
}
}

bail!("wait_until timed out after {} seconds", timeout.as_secs())
}
Loading