Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions src/framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{

use anyhow::Context;
use bitcoincore_rpc::RpcApi;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, info};
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

Expand Down Expand Up @@ -76,6 +77,7 @@ impl TestFramework {
} else {
None
};

let citrea_cli = match &test_case.with_citrea_cli {
false => None,
true => Some(CitreaCli::new(CITREA_CLI_ENV)?),
Expand Down Expand Up @@ -127,7 +129,7 @@ impl TestFramework {
Ok(())
}

pub async fn init_citrea_nodes(&mut self) -> Result<()> {
pub async fn init_citrea_nodes(&mut self, failure_tx: UnboundedSender<String>) -> Result<()> {
// Use first node config for now, as citrea nodes are expected to interact only with this main node for now.
// Additional bitcoin node are solely used for simulating a bitcoin network and tx propagation/re-orgs
let bitcoin_config = &self.ctx.config.bitcoin[0];
Expand All @@ -136,7 +138,7 @@ impl TestFramework {
if self.ctx.config.test_case.get_n_nodes(NodeKind::Sequencer) > 1 {
self.sequencer_cluster = create_optional(
self.ctx.config.test_case.with_sequencer,
SequencerCluster::new(&self.ctx),
SequencerCluster::new(&self.ctx, failure_tx.clone()),
)
.await?;
} else {
Expand All @@ -146,6 +148,7 @@ impl TestFramework {
&self.ctx.config.sequencer[0],
bitcoin_config,
Arc::clone(&self.ctx.docker),
failure_tx.clone(),
),
)
.await?;
Expand All @@ -158,6 +161,7 @@ impl TestFramework {
&self.ctx.config.batch_prover,
bitcoin_config,
Arc::clone(&self.ctx.docker),
failure_tx.clone(),
),
)
.await?;
Expand All @@ -168,6 +172,7 @@ impl TestFramework {
&self.ctx.config.light_client_prover,
bitcoin_config,
Arc::clone(&self.ctx.docker),
failure_tx.clone(),
),
)
.await?;
Expand All @@ -178,15 +183,16 @@ impl TestFramework {
&self.ctx.config.full_node,
bitcoin_config,
Arc::clone(&self.ctx.docker),
failure_tx.clone(),
),
)
.await?;

Ok(())
}

pub async fn init_nodes(&mut self) -> Result<()> {
self.init_citrea_nodes().await?;
pub async fn init_nodes(&mut self, failure_tx: UnboundedSender<String>) -> Result<()> {
self.init_citrea_nodes(failure_tx).await?;
#[cfg(feature = "clementine")]
self.init_clementine_nodes().await?;
Ok(())
Expand Down
44 changes: 32 additions & 12 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use bitcoincore_rpc::{Auth, Client as BitcoinClient};
use serde::Serialize;
use tokio::{
process::Command,
sync::mpsc::UnboundedSender,
time::{sleep, Instant},
};
use tracing::{debug, info, trace};
Expand All @@ -29,6 +30,7 @@ use crate::{
docker::DockerEnv,
framework::TestContext,
log_provider::LogPathProvider,
test_case::watch_log_for_panics,
traits::{NodeT, Restart, SpawnOutput},
utils::{copy_directory, get_citrea_path, get_genesis_path},
Result,
Expand Down Expand Up @@ -102,6 +104,7 @@ where
pub client: Client,
// Bitcoin client targetting node's wallet endpoint
pub da: BitcoinClient,
failure_tx: UnboundedSender<String>,
}

impl<C> Node<C>
Expand All @@ -112,6 +115,7 @@ where
config: &FullL2NodeConfig<C>,
da_config: &BitcoinConfig,
docker: Arc<Option<DockerEnv>>,
failure_tx: UnboundedSender<String>,
) -> Result<Self> {
let spawn_output = <Self as NodeT>::spawn(config, &docker).await?;

Expand All @@ -129,11 +133,18 @@ where
.await
.context("Failed to create RPC client")?;

watch_log_for_panics(
config.log_path(),
config.kind().to_string(),
failure_tx.clone(),
);

Ok(Self {
spawn_output,
config: config.clone(),
client,
da: da_client,
failure_tx,
})
}

Expand Down Expand Up @@ -296,10 +307,9 @@ where
{
async fn wait_until_stopped(&mut self) -> Result<()> {
self.stop().await?;
match &mut self.spawn_output {
SpawnOutput::Child(pid) => pid.wait().await?,
SpawnOutput::Container(_) => unimplemented!("L2 nodes don't run in docker yet"),
};
if let SpawnOutput::Child(pid) = &mut self.spawn_output {
pid.wait().await?;
}
Ok(())
}

Expand All @@ -308,7 +318,9 @@ where
new_config: Option<Self::Config>,
extra_args: Option<Vec<String>>,
) -> Result<()> {
let panic_tx = self.failure_tx.clone();
let config = self.config_mut();
let kind = config.kind();

if let Some(new_config) = new_config {
*config = new_config;
Expand All @@ -320,17 +332,20 @@ where
INDEX.fetch_add(1, Ordering::SeqCst);

let old_dir = config.dir();
let new_dir = old_dir.parent().unwrap().join(format!(
"{}-{}",
config.kind(),
INDEX.load(Ordering::SeqCst)
));
let new_dir =
old_dir
.parent()
.unwrap()
.join(format!("{}-{}", kind, INDEX.load(Ordering::SeqCst)));
copy_directory(old_dir, &new_dir)?;

config.set_dir(new_dir);
config.write_to_file()?;
let log_path = config.log_path();

*self.spawn_output() = Self::spawn(config, extra_args)?;
watch_log_for_panics(log_path, kind.to_string(), panic_tx);

self.wait_for_ready(None).await
}
}
Expand Down Expand Up @@ -398,16 +413,21 @@ where
}

impl NodeCluster<SequencerConfig> {
pub async fn new(ctx: &TestContext) -> Result<Self> {
pub async fn new(ctx: &TestContext, failure_tx: UnboundedSender<String>) -> Result<Self> {
let n_nodes = ctx.config.test_case.get_n_nodes(NodeKind::Sequencer);

let mut cluster = Self {
inner: Vec::with_capacity(n_nodes),
};
let da_config = &ctx.config.bitcoin[0];
for config in &ctx.config.sequencer {
let node =
Node::<SequencerConfig>::new(config, da_config, Arc::clone(&ctx.docker)).await?;
let node = Node::<SequencerConfig>::new(
config,
da_config,
Arc::clone(&ctx.docker),
failure_tx.clone(),
)
.await?;
cluster.inner.push(node);
}

Expand Down
83 changes: 74 additions & 9 deletions src/test_case.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@
use std::{
io::Write,
panic::{self},
path::Path,
path::{Path, PathBuf},
};

use anyhow::{bail, Context};
use async_trait::async_trait;
use futures::FutureExt;
use tokio::signal;
use tokio::{
fs::File,
io::{AsyncBufReadExt, AsyncSeekExt, BufReader, SeekFrom},
signal,
sync::{mpsc, mpsc::UnboundedSender},
};

use super::{
config::{BitcoinConfig, TestCaseConfig, TestCaseEnv},
Expand Down Expand Up @@ -43,9 +48,13 @@ impl<T: TestCase> TestCaseRunner<T> {
}

/// Internal method to fund the wallets, connect the nodes, wait for them to be ready.
async fn prepare(&self, f: &mut TestFramework) -> Result<()> {
async fn prepare(
&self,
f: &mut TestFramework,
failure_tx: UnboundedSender<String>,
) -> Result<()> {
f.fund_da_wallets().await?;
f.init_nodes().await?;
f.init_nodes(failure_tx).await?;
f.bitcoin_nodes.connect_nodes().await?;

if let Some(sequencer) = &f.sequencer {
Expand All @@ -71,8 +80,12 @@ impl<T: TestCase> TestCaseRunner<T> {
Ok(())
}

async fn run_test_case(&mut self, f: &mut TestFramework) -> Result<()> {
self.prepare(f).await?;
async fn run_test_case(
&mut self,
f: &mut TestFramework,
failure_tx: UnboundedSender<String>,
) -> Result<()> {
self.prepare(f, failure_tx).await?;
self.0.setup(f).await?;
self.0.run_test(f).await
}
Expand All @@ -82,18 +95,24 @@ impl<T: TestCase> TestCaseRunner<T> {
/// This sets up the framework, executes the test, and ensures cleanup is performed even if a panic occurs.
pub async fn run(mut self) -> Result<()> {
let mut framework = None;
let (failure_tx, mut failure_rx) = mpsc::unbounded_channel::<String>();

let result = panic::AssertUnwindSafe(async {
tokio::select! {
res = async {
framework = Some(TestFramework::new::<T>().await?);
let f = framework.as_mut().unwrap();
self.run_test_case(f).await
} => res,
self.run_test_case(f, failure_tx).await
} => res,
_ = signal::ctrl_c() => {
println!("Initiating shutdown...");
bail!("Shutdown received before completion")
}
e = failure_rx.recv() => {
if let Some(e) =e {
bail!(e)
}
Ok(())
}
}
})
.catch_unwind()
Expand Down Expand Up @@ -282,3 +301,49 @@ pub trait TestCase: Send + Sync + 'static {
Ok(())
}
}

pub fn watch_log_for_panics(
log_path: PathBuf,
process_name: String,
failure_tx: UnboundedSender<String>,
) {
tokio::spawn(async move {
while !log_path.exists() {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}

let mut file = File::open(&log_path).await.unwrap();

let _ = file.seek(SeekFrom::End(0)).await;
let mut reader = BufReader::new(file);
let mut line = String::new();

loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
continue;
}
Ok(_) => {
let line_lower = line.to_lowercase();

if line_lower.contains("panic")
|| line_lower.contains("fatal")
|| line_lower.contains("assertion failed")
{
let _ = failure_tx.send(format!(
"{} panicked with: {}",
process_name,
line.trim()
));
return;
}
}
Err(_) => {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
}
});
}
Loading