Skip to content

Commit 2a6ad0e

Browse files
committed
Merge branch 'main' of github.com:chainwayxyz/citrea-e2e into da-monitoring
2 parents 5c1a0d5 + 3a9bf6f commit 2a6ad0e

File tree

1 file changed

+115
-11
lines changed

1 file changed

+115
-11
lines changed

src/bitcoin.rs

Lines changed: 115 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::{
22
collections::HashSet,
33
fs::File,
4+
future::Future,
45
process::Stdio,
56
sync::Arc,
67
time::{Duration, Instant},
@@ -308,7 +309,7 @@ impl BitcoinNodeCluster {
308309

309310
pub async fn wait_for_sync(&self, timeout: Option<Duration>) -> Result<()> {
310311
let start = Instant::now();
311-
let timeout = timeout.unwrap_or(Duration::from_secs(30));
312+
let timeout = timeout.unwrap_or(Duration::from_secs(60));
312313
while start.elapsed() < timeout {
313314
let mut heights = HashSet::new();
314315
for node in &self.inner {
@@ -334,9 +335,53 @@ impl BitcoinNodeCluster {
334335
SpawnOutput::Container(container) => container.ip.clone(),
335336
SpawnOutput::Child(_) => "127.0.0.1".to_string(),
336337
};
338+
let ip_port = format!("{}:{}", ip, to_node.config.p2p_port);
339+
340+
from_node.onetry_node(&ip_port).await?;
341+
342+
let from_subver = from_node.get_network_info().await?.subversion;
343+
let to_subver = to_node.get_network_info().await?.subversion;
344+
345+
// Check and wait for both inbound and outbound connections
346+
wait_until(|| async {
347+
let out_connected = from_node
348+
.get_peer_info()
349+
.await?
350+
.iter()
351+
.any(|peer| peer.subver == to_subver && !peer.inbound);
352+
353+
let in_connected = to_node
354+
.get_peer_info()
355+
.await?
356+
.iter()
357+
.any(|peer| peer.subver == from_subver && peer.inbound);
358+
359+
Ok(out_connected && in_connected)
360+
})
361+
.await?;
337362

338-
let target_node_addr = format!("{}:{}", ip, to_node.config.p2p_port);
339-
from_node.onetry_node(&target_node_addr).await?;
363+
// Handshake check. Wait for pong messages
364+
wait_until(|| async {
365+
let out_peer = from_node
366+
.get_peer_info()
367+
.await?
368+
.into_iter()
369+
.find(|peer| peer.subver == to_subver && !peer.inbound);
370+
371+
let in_peer = to_node
372+
.get_peer_info()
373+
.await?
374+
.into_iter()
375+
.find(|peer| peer.subver == from_subver && peer.inbound);
376+
377+
if let (Some(out_p), Some(in_p)) = (out_peer, in_peer) {
378+
Ok(out_p.bytesrecv_per_msg.get("pong").unwrap_or(&0) >= &29u64
379+
&& in_p.bytesrecv_per_msg.get("pong").unwrap_or(&0) >= &29u64)
380+
} else {
381+
Ok(false)
382+
}
383+
})
384+
.await?;
340385
}
341386
}
342387
}
@@ -347,20 +392,51 @@ impl BitcoinNodeCluster {
347392
for (i, from_node) in self.iter().enumerate() {
348393
for (j, to_node) in self.iter().enumerate() {
349394
if i != j {
350-
let ip = match &to_node.spawn_output {
351-
SpawnOutput::Container(container) => container.ip.clone(),
352-
SpawnOutput::Child(_) => "127.0.0.1".to_string(),
353-
};
354-
355-
let target_node_addr = format!("{}:{}", ip, to_node.config.p2p_port);
356-
// from_node.remove_node(&target_node_addr).await?;
357-
from_node.disconnect_node(&target_node_addr).await?;
395+
let to_subver = to_node.get_network_info().await?.subversion;
396+
397+
let peers = from_node.get_peer_info().await?;
398+
let peer_ids: Vec<_> = peers
399+
.iter()
400+
.filter(|peer| peer.subver == to_subver)
401+
.map(|peer| peer.id)
402+
.collect();
403+
404+
if peer_ids.is_empty() {
405+
return Ok(());
406+
}
407+
408+
for peer_id in peer_ids {
409+
match from_node.disconnect_node_by_id(peer_id as u32).await {
410+
Ok(_) => (),
411+
Err(e) => {
412+
if !e.to_string().contains("Node not found") {
413+
bail!("{e}")
414+
}
415+
}
416+
}
417+
}
418+
419+
wait_until(|| self.test_connection(from_node, to_node, false)).await?;
420+
wait_until(|| self.test_connection(to_node, from_node, false)).await?;
358421
}
359422
}
360423
}
361424
Ok(())
362425
}
363426

427+
async fn test_connection(
428+
&self,
429+
from_node: &BitcoinNode,
430+
to_node: &BitcoinNode,
431+
expect_connected: bool,
432+
) -> Result<bool> {
433+
let to_subver = to_node.get_network_info().await?.subversion;
434+
let peers = from_node.get_peer_info().await?;
435+
let is_connected = peers.iter().any(|peer| peer.subver == to_subver);
436+
437+
Ok(is_connected == expect_connected)
438+
}
439+
364440
pub fn get(&self, index: usize) -> Option<&BitcoinNode> {
365441
self.inner.get(index)
366442
}
@@ -389,3 +465,31 @@ async fn wait_for_rpc_ready(client: &Client, timeout: Option<Duration>) -> Resul
389465
}
390466
bail!("Timeout waiting for RPC to be ready")
391467
}
468+
469+
pub async fn wait_until<F, Fut>(mut f: F) -> Result<()>
470+
where
471+
F: FnMut() -> Fut,
472+
Fut: Future<Output = Result<bool>>,
473+
{
474+
let timeout = Duration::from_secs(60);
475+
let start = Instant::now();
476+
477+
while start.elapsed() < timeout {
478+
match f().await {
479+
Ok(true) => return Ok(()),
480+
Ok(false) => {
481+
sleep(Duration::from_secs(1)).await;
482+
continue;
483+
}
484+
Err(e) => {
485+
if e.to_string().contains("node not connected") {
486+
sleep(Duration::from_secs(1)).await;
487+
continue;
488+
}
489+
return Err(e);
490+
}
491+
}
492+
}
493+
494+
bail!("wait_until timed out after {} seconds", timeout.as_secs())
495+
}

0 commit comments

Comments
 (0)