From 39445cfef5f5e45dc1020152caa480afcd4c0ad7 Mon Sep 17 00:00:00 2001 From: Ryan Date: Tue, 17 Jun 2025 12:35:10 +0200 Subject: [PATCH 01/12] feat: mock da --- crates/da/src/lib.rs | 1 + crates/node_types/lightclient/src/lib.rs | 3 + .../node_types/lightclient/src/tests/mod.rs | 64 +++++++++++++++++++ 3 files changed, 68 insertions(+) create mode 100644 crates/node_types/lightclient/src/tests/mod.rs diff --git a/crates/da/src/lib.rs b/crates/da/src/lib.rs index 8fa721d4..ad0b3dee 100644 --- a/crates/da/src/lib.rs +++ b/crates/da/src/lib.rs @@ -66,6 +66,7 @@ impl From for (Digest, Digest) { /// `VerifiableStateTransition` is a trait wrapper around `FinalizedEpoch` that allows for mocking. /// The only concrete implementation of this trait is by `FinalizedEpoch`. +#[automock] pub trait VerifiableStateTransition: Send { fn verify( &self, diff --git a/crates/node_types/lightclient/src/lib.rs b/crates/node_types/lightclient/src/lib.rs index f432dda1..ea4bdd6b 100644 --- a/crates/node_types/lightclient/src/lib.rs +++ b/crates/node_types/lightclient/src/lib.rs @@ -1,2 +1,5 @@ pub mod lightclient; pub use lightclient::LightClient; + +#[cfg(test)] +mod tests; diff --git a/crates/node_types/lightclient/src/tests/mod.rs b/crates/node_types/lightclient/src/tests/mod.rs new file mode 100644 index 00000000..635360ff --- /dev/null +++ b/crates/node_types/lightclient/src/tests/mod.rs @@ -0,0 +1,64 @@ +use std::collections::HashMap; + +use anyhow::Result; +use prism_common::digest::Digest; +use prism_da::{ + EpochVerificationError, MockLightDataAvailabilityLayer, MockVerifiableStateTransition, + VerifiableEpoch, VerifiableStateTransition, +}; + +macro_rules! mock_da { + ($(($height:expr, $($spec:tt),+)),* $(,)?) => {{ + let mut mock_da = MockLightDataAvailabilityLayer::new(); + mock_da.expect_get_finalized_epoch().returning(move |height| { + match height { + $( + $height => { + let mut transitions = vec![]; + $( + let mut epoch = MockVerifiableStateTransition::new(); + mock_da!(@make_epoch epoch, $spec); + transitions.push(Box::new(epoch) as Box); + )+ + Ok(transitions) + } + )* + _ => Ok(vec![]), + } + }); + mock_da + }}; + + // Success case - tuple + (@make_epoch $epoch:ident, ($h1:expr, $h2:expr)) => { + let hash1 = $h1; + let hash2 = $h2; + $epoch.expect_verify().returning(move |_, _| { + Ok((Digest::hash(hash1), Digest::hash(hash2))) + }); + }; + + // Error case - Err(...) + (@make_epoch $epoch:ident, Err($error:expr)) => { + let err = $error; + $epoch.expect_verify().returning(move |_, _| Err(err)); + }; + + // String error shorthand + (@make_epoch $epoch:ident, $error:literal) => { + let err_msg = $error; + $epoch.expect_verify().returning(move |_, _| { + Err(EpochVerificationError::ProofVerificationError(err_msg.to_string())) + }); + }; +} + +fn setup() { + let mut mock_da = MockLightDataAvailabilityLayer::new(); + mock_da![ + (5, ("abc", "def")), + (6, ("ghi", "jkl"), ("xyz", "yuu")), + (7, ("mno", "pqr")), + (8, "Expected Error") + ]; +} From 92df3a577f4504e96da3ec50a91200791bb5cc12 Mon Sep 17 00:00:00 2001 From: Ryan Date: Tue, 24 Jun 2025 14:36:28 +0200 Subject: [PATCH 02/12] update --- .../node_types/lightclient/src/lightclient.rs | 45 +++++---- .../node_types/lightclient/src/tests/mod.rs | 94 ++++++++++++++++--- 2 files changed, 102 insertions(+), 37 deletions(-) diff --git a/crates/node_types/lightclient/src/lightclient.rs b/crates/node_types/lightclient/src/lightclient.rs index 8f663f3b..85b0749d 100644 --- a/crates/node_types/lightclient/src/lightclient.rs +++ b/crates/node_types/lightclient/src/lightclient.rs @@ -40,16 +40,18 @@ pub struct LightClient { /// The event channel, used to spawn new subscribers and publishers. event_chan: Arc, event_pub: Arc, + sync_state: Arc>, // The latest commitment. latest_commitment: Arc>>, } -struct SyncState { - current_height: u64, - initial_sync_completed: bool, - initial_sync_in_progress: bool, - latest_finalized_epoch: Option, +#[derive(Default, Clone)] +pub struct SyncState { + pub current_height: u64, + pub initial_sync_completed: bool, + pub initial_sync_in_progress: bool, + pub latest_finalized_epoch: Option, } #[allow(dead_code)] @@ -64,6 +66,8 @@ impl LightClient { let event_chan = da.event_channel(); let event_pub = Arc::new(event_chan.publisher()); + let sync_state = Arc::new(RwLock::new(SyncState::default())); + Self { da, sp1_vkeys, @@ -71,17 +75,16 @@ impl LightClient { event_chan, event_pub, latest_commitment: Arc::new(RwLock::new(None)), + sync_state, } } + pub async fn get_sync_state(&self) -> SyncState { + self.sync_state.read().await.clone() + } + pub async fn run(self: Arc) -> Result<()> { // start listening for new headers to update sync target - let sync_state = Arc::new(RwLock::new(SyncState { - current_height: 0, - initial_sync_completed: false, - initial_sync_in_progress: false, - latest_finalized_epoch: None, - })); let mut event_sub = self.event_chan.subscribe(); while let Ok(event_info) = event_sub.recv().await { @@ -90,27 +93,27 @@ impl LightClient { if let Some(metrics) = get_metrics() { metrics.record_celestia_synced_height(height, vec![]); if let Some(latest_finalized_epoch) = - sync_state.read().await.latest_finalized_epoch + self.sync_state.read().await.latest_finalized_epoch { metrics.record_current_epoch(latest_finalized_epoch, vec![]); } } info!("new height from headersub {}", height); - self.clone().handle_new_header(height, sync_state.clone()).await; + self.clone().handle_new_header(height).await; } } Ok(()) } - async fn handle_new_header(self: Arc, height: u64, state: Arc>) { + async fn handle_new_header(self: Arc, height: u64) { // start initial historical backward sync if needed and not already in progress { - let mut state_handle = state.write().await; + let mut state_handle = self.sync_state.write().await; if !state_handle.initial_sync_completed && !state_handle.initial_sync_in_progress { state_handle.initial_sync_in_progress = true; drop(state_handle); - self.start_backward_sync(height, state.clone()).await; + self.start_backward_sync(height).await; return; } } @@ -128,7 +131,7 @@ impl LightClient { self.event_pub.send(PrismEvent::RecursiveVerificationCompleted { height }); // Update our latest known finalized epoch - let mut state = state.write().await; + let mut state = self.sync_state.write().await; state.latest_finalized_epoch = Some(height); // If we're waiting for initial sync, this completes it @@ -149,11 +152,7 @@ impl LightClient { } } - async fn start_backward_sync( - self: Arc, - network_height: u64, - state: Arc>, - ) { + async fn start_backward_sync(self: Arc, network_height: u64) { info!("starting historical sync"); // Announce that sync has started self.event_pub.send(PrismEvent::SyncStarted { @@ -166,7 +165,7 @@ impl LightClient { // Start a task to find a finalized epoch by searching backward let light_client = Arc::clone(&self); - let state = state.clone(); + let state = self.sync_state.clone(); spawn_task(async move { // Find the most recent valid epoch by searching backward if let Some(epoch_height) = diff --git a/crates/node_types/lightclient/src/tests/mod.rs b/crates/node_types/lightclient/src/tests/mod.rs index 635360ff..0674080b 100644 --- a/crates/node_types/lightclient/src/tests/mod.rs +++ b/crates/node_types/lightclient/src/tests/mod.rs @@ -1,11 +1,21 @@ -use std::collections::HashMap; +use std::{ + collections::HashMap, + sync::{self, Arc}, + time::Duration, +}; use anyhow::Result; use prism_common::digest::Digest; use prism_da::{ - EpochVerificationError, MockLightDataAvailabilityLayer, MockVerifiableStateTransition, + LightDataAvailabilityLayer, MockLightDataAvailabilityLayer, MockVerifiableStateTransition, VerifiableEpoch, VerifiableStateTransition, + events::{EventChannel, PrismEvent}, }; +use prism_errors::EpochVerificationError; +use prism_keys::SigningKey; +use tokio::spawn; + +use crate::LightClient; macro_rules! mock_da { ($(($height:expr, $($spec:tt),+)),* $(,)?) => {{ @@ -17,7 +27,7 @@ macro_rules! mock_da { let mut transitions = vec![]; $( let mut epoch = MockVerifiableStateTransition::new(); - mock_da!(@make_epoch epoch, $spec); + mock_da!(@make_epoch epoch, $spec, $height); transitions.push(Box::new(epoch) as Box); )+ Ok(transitions) @@ -30,35 +40,91 @@ macro_rules! mock_da { }}; // Success case - tuple - (@make_epoch $epoch:ident, ($h1:expr, $h2:expr)) => { + (@make_epoch $epoch:ident, ($h1:expr, $h2:expr), $height:expr) => { let hash1 = $h1; let hash2 = $h2; + $epoch.expect_height().returning(move || $height); $epoch.expect_verify().returning(move |_, _| { - Ok((Digest::hash(hash1), Digest::hash(hash2))) + Ok((Digest::hash(hash1), Digest::hash(hash2)).into()) }); }; // Error case - Err(...) - (@make_epoch $epoch:ident, Err($error:expr)) => { + (@make_epoch $epoch:ident, Err($error:expr), $height:expr) => { let err = $error; + $epoch.expect_height().returning(move || $height); $epoch.expect_verify().returning(move |_, _| Err(err)); }; // String error shorthand - (@make_epoch $epoch:ident, $error:literal) => { + (@make_epoch $epoch:ident, $error:literal, $height:expr) => { let err_msg = $error; + $epoch.expect_height().returning(move || $height); $epoch.expect_verify().returning(move |_, _| { Err(EpochVerificationError::ProofVerificationError(err_msg.to_string())) }); }; } -fn setup() { - let mut mock_da = MockLightDataAvailabilityLayer::new(); - mock_da![ - (5, ("abc", "def")), - (6, ("ghi", "jkl"), ("xyz", "yuu")), - (7, ("mno", "pqr")), - (8, "Expected Error") +// TODO: This doesnt work fully yet, racy because the write to sync_state occurs before the update to comm +macro_rules! wait_for_height { + ($lc:expr, $target_height:expr) => {{ + let mut sync_state = $lc.get_sync_state().await; + while sync_state.current_height < $target_height { + tokio::time::sleep(Duration::from_millis(10)).await; + sync_state = $lc.get_sync_state().await; + } + }}; +} + +#[tokio::test] +async fn test_mock_da() { + let mut mock_da = mock_da![ + (4, ("g", "a")), + (5, ("a", "b")), + (6, ("b", "c"), ("c", "d")), + (7, ("d", "e")), + // (8, "Expected Error") ]; + let chan = EventChannel::new(); + let publisher = chan.publisher(); + mock_da.expect_event_channel().return_const(Arc::new(chan)); + + let mock_da = Arc::new(mock_da); + + let prover_key = SigningKey::new_ed25519(); + let lc = Arc::new(LightClient::new(mock_da, prover_key.verifying_key())); + + let runner = lc.clone(); + spawn(async move { + runner.run().await.unwrap(); + }); + + //TODO: Just wait for events + tokio::time::sleep(Duration::from_secs(1)).await; + publisher.send(PrismEvent::UpdateDAHeight { height: 3 }); + + publisher.send(PrismEvent::UpdateDAHeight { height: 4 }); + wait_for_height!(lc, 4); + assert_eq!(Digest::hash("a"), lc.get_latest_commitment().await.unwrap()); + + publisher.send(PrismEvent::UpdateDAHeight { height: 5 }); + wait_for_height!(lc, 5); + tokio::time::sleep(Duration::from_secs(1)).await; + println!("{:?}", Digest::hash("a")); + println!("{:?}", Digest::hash("b")); + println!("{:?}", Digest::hash("c")); + println!("{:?}", Digest::hash("d")); + println!("{:?}", Digest::hash("e")); + assert_eq!(Digest::hash("b"), lc.get_latest_commitment().await.unwrap()); + + publisher.send(PrismEvent::UpdateDAHeight { height: 6 }); + wait_for_height!(lc, 6); + tokio::time::sleep(Duration::from_secs(1)).await; + assert_eq!(Digest::hash("d"), lc.get_latest_commitment().await.unwrap()); + + publisher.send(PrismEvent::UpdateDAHeight { height: 7 }); + wait_for_height!(lc, 7); + tokio::time::sleep(Duration::from_secs(1)).await; + assert_eq!(Digest::hash("e"), lc.get_latest_commitment().await.unwrap()); } From d834c24f619f4c604e2aee5815e9b52925d72384 Mon Sep 17 00:00:00 2001 From: Ryan Date: Wed, 25 Jun 2025 12:43:23 +0200 Subject: [PATCH 03/12] update --- Cargo.toml | 2 +- .../node_types/lightclient/src/tests/mod.rs | 145 +++++++++++++----- 2 files changed, 110 insertions(+), 37 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c27f461e..f5061958 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -184,7 +184,7 @@ sha2-v0-10-8 = { git = "https://github.com/sp1-patches/RustCrypto-hashes", packa curve25519-dalek-ng = { git = "https://github.com/sp1-patches/curve25519-dalek-ng", tag = "patch-4.1.1-sp1-4.0.0" } p256 = { git = "https://github.com/sp1-patches/elliptic-curves", tag = "patch-p256-13.2-sp1-4.1.0" } k256 = { git = "https://github.com/sp1-patches/elliptic-curves", tag = "patch-k256-13.4-sp1-4.1.0" } -celestia-types = { git = "https://github.com/deltadevsde/lumina.git" } +celestia-types = { git = "https://github.com/deltadevsde/lumina.git" } lumina-node = { git = "https://github.com/deltadevsde/lumina.git" } [workspace.features] diff --git a/crates/node_types/lightclient/src/tests/mod.rs b/crates/node_types/lightclient/src/tests/mod.rs index 0674080b..ac06df8d 100644 --- a/crates/node_types/lightclient/src/tests/mod.rs +++ b/crates/node_types/lightclient/src/tests/mod.rs @@ -1,17 +1,10 @@ -use std::{ - collections::HashMap, - sync::{self, Arc}, - time::Duration, -}; +use std::{sync::Arc, time::Duration}; -use anyhow::Result; use prism_common::digest::Digest; use prism_da::{ - LightDataAvailabilityLayer, MockLightDataAvailabilityLayer, MockVerifiableStateTransition, - VerifiableEpoch, VerifiableStateTransition, + MockLightDataAvailabilityLayer, MockVerifiableStateTransition, VerifiableStateTransition, events::{EventChannel, PrismEvent}, }; -use prism_errors::EpochVerificationError; use prism_keys::SigningKey; use tokio::spawn; @@ -66,29 +59,47 @@ macro_rules! mock_da { }; } -// TODO: This doesnt work fully yet, racy because the write to sync_state occurs before the update to comm -macro_rules! wait_for_height { - ($lc:expr, $target_height:expr) => {{ - let mut sync_state = $lc.get_sync_state().await; - while sync_state.current_height < $target_height { - tokio::time::sleep(Duration::from_millis(10)).await; - sync_state = $lc.get_sync_state().await; +macro_rules! wait_for_sync { + ($sub:expr, $target_height:expr) => {{ + while let Ok(event) = $sub.recv().await { + match event.event { + PrismEvent::EpochVerified { height } => { + if height >= $target_height { + return; + } + } + PrismEvent::EpochVerificationFailed { height, error } => { + if height >= $target_height { + // TODO: Placeholder + println!("Epoch verification failed at height {}: {}", height, error); + return; + } + } + _ => {} + } } }}; } +macro_rules! assert_current_commitment { + ($lc:expr, $expected:expr) => { + let actual = $lc.get_latest_commitment().await.unwrap(); + assert_eq!(Digest::hash($expected), actual); + }; +} + #[tokio::test] -async fn test_mock_da() { +async fn test_realtime_sync() { let mut mock_da = mock_da![ (4, ("g", "a")), (5, ("a", "b")), - (6, ("b", "c"), ("c", "d")), - (7, ("d", "e")), - // (8, "Expected Error") + (7, ("b", "c"), ("c", "d")), + (8, ("d", "e")), ]; let chan = EventChannel::new(); let publisher = chan.publisher(); - mock_da.expect_event_channel().return_const(Arc::new(chan)); + let arced_chan = Arc::new(chan); + mock_da.expect_event_channel().return_const(arced_chan.clone()); let mock_da = Arc::new(mock_da); @@ -99,32 +110,94 @@ async fn test_mock_da() { spawn(async move { runner.run().await.unwrap(); }); + let mut sub = arced_chan.clone().subscribe(); //TODO: Just wait for events tokio::time::sleep(Duration::from_secs(1)).await; publisher.send(PrismEvent::UpdateDAHeight { height: 3 }); publisher.send(PrismEvent::UpdateDAHeight { height: 4 }); - wait_for_height!(lc, 4); - assert_eq!(Digest::hash("a"), lc.get_latest_commitment().await.unwrap()); + wait_for_sync!(sub, 4); + assert_current_commitment!(lc, "a"); publisher.send(PrismEvent::UpdateDAHeight { height: 5 }); - wait_for_height!(lc, 5); - tokio::time::sleep(Duration::from_secs(1)).await; - println!("{:?}", Digest::hash("a")); - println!("{:?}", Digest::hash("b")); - println!("{:?}", Digest::hash("c")); - println!("{:?}", Digest::hash("d")); - println!("{:?}", Digest::hash("e")); - assert_eq!(Digest::hash("b"), lc.get_latest_commitment().await.unwrap()); + wait_for_sync!(sub, 5); + assert_current_commitment!(lc, "b"); publisher.send(PrismEvent::UpdateDAHeight { height: 6 }); - wait_for_height!(lc, 6); - tokio::time::sleep(Duration::from_secs(1)).await; - assert_eq!(Digest::hash("d"), lc.get_latest_commitment().await.unwrap()); + wait_for_sync!(sub, 6); + assert_current_commitment!(lc, "b"); publisher.send(PrismEvent::UpdateDAHeight { height: 7 }); - wait_for_height!(lc, 7); + wait_for_sync!(sub, 6); + assert_current_commitment!(lc, "d"); + + publisher.send(PrismEvent::UpdateDAHeight { height: 8 }); + wait_for_sync!(sub, 7); + assert_current_commitment!(lc, "e"); +} + +#[tokio::test] +async fn test_backwards_sync() { + let mut mock_da = mock_da![(8, ("a", "b")),]; + let chan = EventChannel::new(); + let publisher = chan.publisher(); + let arced_chan = Arc::new(chan); + mock_da.expect_event_channel().return_const(arced_chan.clone()); + + let mock_da = Arc::new(mock_da); + + let prover_key = SigningKey::new_ed25519(); + let lc = Arc::new(LightClient::new(mock_da, prover_key.verifying_key())); + + let runner = lc.clone(); + spawn(async move { + runner.run().await.unwrap(); + }); + let mut sub = arced_chan.clone().subscribe(); + + //TODO: Just wait for events + tokio::time::sleep(Duration::from_secs(1)).await; + publisher.send(PrismEvent::UpdateDAHeight { height: 20 }); + while let Ok(event_info) = sub.recv().await { + if let PrismEvent::RecursiveVerificationCompleted { height: _ } = event_info.event { + assert_current_commitment!(lc, "b"); + return; + } + } +} + +#[tokio::test] +async fn test_incoming_epoch_during_backwards_sync() { + let mut mock_da = mock_da![(5000, ("a", "b")), (5101, ("c", "d"))]; + let chan = EventChannel::new(); + let publisher = chan.publisher(); + let arced_chan = Arc::new(chan); + mock_da.expect_event_channel().return_const(arced_chan.clone()); + + let mock_da = Arc::new(mock_da); + + let prover_key = SigningKey::new_ed25519(); + let lc = Arc::new(LightClient::new(mock_da, prover_key.verifying_key())); + + let runner = lc.clone(); + spawn(async move { + runner.run().await.unwrap(); + }); + let mut sub = arced_chan.clone().subscribe(); + + //TODO: Just wait for events tokio::time::sleep(Duration::from_secs(1)).await; - assert_eq!(Digest::hash("e"), lc.get_latest_commitment().await.unwrap()); + publisher.send(PrismEvent::UpdateDAHeight { height: 5100 }); + publisher.send(PrismEvent::UpdateDAHeight { height: 5101 }); + while let Ok(event_info) = sub.recv().await { + if let PrismEvent::RecursiveVerificationCompleted { height: _ } = event_info.event { + assert_current_commitment!(lc, "d"); + return; + } + } + + let sync_state = lc.get_sync_state().await; + assert!(sync_state.initial_sync_completed); + assert!(!sync_state.initial_sync_in_progress); } From a896e477def1512906498449b9ec4ecfb3a59913 Mon Sep 17 00:00:00 2001 From: Ryan Date: Wed, 25 Jun 2025 12:45:26 +0200 Subject: [PATCH 04/12] fix --- crates/node_types/lightclient/src/tests/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/node_types/lightclient/src/tests/mod.rs b/crates/node_types/lightclient/src/tests/mod.rs index ac06df8d..033f07ce 100644 --- a/crates/node_types/lightclient/src/tests/mod.rs +++ b/crates/node_types/lightclient/src/tests/mod.rs @@ -129,11 +129,11 @@ async fn test_realtime_sync() { assert_current_commitment!(lc, "b"); publisher.send(PrismEvent::UpdateDAHeight { height: 7 }); - wait_for_sync!(sub, 6); + wait_for_sync!(sub, 7); assert_current_commitment!(lc, "d"); publisher.send(PrismEvent::UpdateDAHeight { height: 8 }); - wait_for_sync!(sub, 7); + wait_for_sync!(sub, 8); assert_current_commitment!(lc, "e"); } From f727dd05eecc89a91edc2ded9ff3ad14021da956 Mon Sep 17 00:00:00 2001 From: Ryan Date: Wed, 25 Jun 2025 14:05:58 +0200 Subject: [PATCH 05/12] cleaner setup --- crates/da/src/events.rs | 9 +- .../node_types/lightclient/src/lightclient.rs | 1 + .../node_types/lightclient/src/tests/mod.rs | 119 +++++++----------- .../uniffi-lightclient/src/types.rs | 3 + 4 files changed, 57 insertions(+), 75 deletions(-) diff --git a/crates/da/src/events.rs b/crates/da/src/events.rs index 9b2d18e2..348a8314 100644 --- a/crates/da/src/events.rs +++ b/crates/da/src/events.rs @@ -1,4 +1,4 @@ -use lumina_node::events::{NodeEvent, EventSubscriber as LuminaEventSub}; +use lumina_node::events::{EventSubscriber as LuminaEventSub, NodeEvent}; use prism_common::digest::Digest; use serde::Serialize; use std::{fmt, sync::Arc}; @@ -14,6 +14,7 @@ const EVENT_CHANNEL_CAPACITY: usize = 1024; #[serde(tag = "type")] #[serde(rename_all = "snake_case")] pub enum PrismEvent { + Ready, SyncStarted { height: u64 }, UpdateDAHeight { height: u64 }, EpochVerificationStarted { height: u64 }, @@ -35,6 +36,12 @@ pub enum PrismEvent { impl fmt::Display for PrismEvent { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { + PrismEvent::Ready => { + write!( + f, + "Node is ready to start sync and listening for incoming headers" + ) + } PrismEvent::SyncStarted { height } => { write!(f, "Starting sync at height {}", height) } diff --git a/crates/node_types/lightclient/src/lightclient.rs b/crates/node_types/lightclient/src/lightclient.rs index 85b0749d..00ca16c4 100644 --- a/crates/node_types/lightclient/src/lightclient.rs +++ b/crates/node_types/lightclient/src/lightclient.rs @@ -87,6 +87,7 @@ impl LightClient { // start listening for new headers to update sync target let mut event_sub = self.event_chan.subscribe(); + self.event_pub.send(PrismEvent::Ready); while let Ok(event_info) = event_sub.recv().await { if let PrismEvent::UpdateDAHeight { height } = event_info.event { #[cfg(feature = "telemetry")] diff --git a/crates/node_types/lightclient/src/tests/mod.rs b/crates/node_types/lightclient/src/tests/mod.rs index 033f07ce..8ec17efb 100644 --- a/crates/node_types/lightclient/src/tests/mod.rs +++ b/crates/node_types/lightclient/src/tests/mod.rs @@ -1,9 +1,9 @@ -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use prism_common::digest::Digest; use prism_da::{ MockLightDataAvailabilityLayer, MockVerifiableStateTransition, VerifiableStateTransition, - events::{EventChannel, PrismEvent}, + events::{EventChannel, EventPublisher, EventSubscriber, PrismEvent}, }; use prism_keys::SigningKey; use tokio::spawn; @@ -59,26 +59,13 @@ macro_rules! mock_da { }; } -macro_rules! wait_for_sync { - ($sub:expr, $target_height:expr) => {{ - while let Ok(event) = $sub.recv().await { - match event.event { - PrismEvent::EpochVerified { height } => { - if height >= $target_height { - return; - } - } - PrismEvent::EpochVerificationFailed { height, error } => { - if height >= $target_height { - // TODO: Placeholder - println!("Epoch verification failed at height {}: {}", height, error); - return; - } - } - _ => {} - } - } - }}; +async fn wait_for_sync(sub: &mut EventSubscriber, target_height: u64) { + wait_for_event(sub, |event| match event { + PrismEvent::EpochVerified { height } => height >= target_height, + PrismEvent::EpochVerificationFailed { height, .. } => height >= target_height, + _ => false, + }) + .await; } macro_rules! assert_current_commitment { @@ -88,14 +75,20 @@ macro_rules! assert_current_commitment { }; } -#[tokio::test] -async fn test_realtime_sync() { - let mut mock_da = mock_da![ - (4, ("g", "a")), - (5, ("a", "b")), - (7, ("b", "c"), ("c", "d")), - (8, ("d", "e")), - ]; +async fn wait_for_event(sub: &mut EventSubscriber, mut handler: F) +where + F: FnMut(PrismEvent) -> bool, // return true to break the loop +{ + while let Ok(event_info) = sub.recv().await { + if handler(event_info.event) { + break; + } + } +} + +async fn setup( + mut mock_da: MockLightDataAvailabilityLayer, +) -> (Arc, EventSubscriber, EventPublisher) { let chan = EventChannel::new(); let publisher = chan.publisher(); let arced_chan = Arc::new(chan); @@ -111,56 +104,51 @@ async fn test_realtime_sync() { runner.run().await.unwrap(); }); let mut sub = arced_chan.clone().subscribe(); + wait_for_event(&mut sub, |event| matches!(event, PrismEvent::Ready)).await; + (lc, sub, publisher) +} + +#[tokio::test] +async fn test_realtime_sync() { + let (lc, mut sub, publisher) = setup(mock_da![ + (4, ("g", "a")), + (5, ("a", "b")), + (7, ("b", "c"), ("c", "d")), + (8, ("d", "e")), + ]) + .await; - //TODO: Just wait for events - tokio::time::sleep(Duration::from_secs(1)).await; publisher.send(PrismEvent::UpdateDAHeight { height: 3 }); publisher.send(PrismEvent::UpdateDAHeight { height: 4 }); - wait_for_sync!(sub, 4); + wait_for_sync(&mut sub, 4).await; assert_current_commitment!(lc, "a"); publisher.send(PrismEvent::UpdateDAHeight { height: 5 }); - wait_for_sync!(sub, 5); + wait_for_sync(&mut sub, 5).await; assert_current_commitment!(lc, "b"); publisher.send(PrismEvent::UpdateDAHeight { height: 6 }); - wait_for_sync!(sub, 6); + wait_for_sync(&mut sub, 6).await; assert_current_commitment!(lc, "b"); publisher.send(PrismEvent::UpdateDAHeight { height: 7 }); - wait_for_sync!(sub, 7); + wait_for_sync(&mut sub, 7).await; assert_current_commitment!(lc, "d"); publisher.send(PrismEvent::UpdateDAHeight { height: 8 }); - wait_for_sync!(sub, 8); + wait_for_sync(&mut sub, 8).await; assert_current_commitment!(lc, "e"); } #[tokio::test] async fn test_backwards_sync() { - let mut mock_da = mock_da![(8, ("a", "b")),]; - let chan = EventChannel::new(); - let publisher = chan.publisher(); - let arced_chan = Arc::new(chan); - mock_da.expect_event_channel().return_const(arced_chan.clone()); + let (lc, mut sub, publisher) = setup(mock_da![(8, ("a", "b"))]).await; - let mock_da = Arc::new(mock_da); - - let prover_key = SigningKey::new_ed25519(); - let lc = Arc::new(LightClient::new(mock_da, prover_key.verifying_key())); - - let runner = lc.clone(); - spawn(async move { - runner.run().await.unwrap(); - }); - let mut sub = arced_chan.clone().subscribe(); - - //TODO: Just wait for events - tokio::time::sleep(Duration::from_secs(1)).await; publisher.send(PrismEvent::UpdateDAHeight { height: 20 }); while let Ok(event_info) = sub.recv().await { - if let PrismEvent::RecursiveVerificationCompleted { height: _ } = event_info.event { + if let PrismEvent::RecursiveVerificationCompleted { height } = event_info.event { + assert_eq!(height, 8); assert_current_commitment!(lc, "b"); return; } @@ -169,25 +157,8 @@ async fn test_backwards_sync() { #[tokio::test] async fn test_incoming_epoch_during_backwards_sync() { - let mut mock_da = mock_da![(5000, ("a", "b")), (5101, ("c", "d"))]; - let chan = EventChannel::new(); - let publisher = chan.publisher(); - let arced_chan = Arc::new(chan); - mock_da.expect_event_channel().return_const(arced_chan.clone()); - - let mock_da = Arc::new(mock_da); - - let prover_key = SigningKey::new_ed25519(); - let lc = Arc::new(LightClient::new(mock_da, prover_key.verifying_key())); - - let runner = lc.clone(); - spawn(async move { - runner.run().await.unwrap(); - }); - let mut sub = arced_chan.clone().subscribe(); + let (lc, mut sub, publisher) = setup(mock_da![(5000, ("a", "b")), (5101, ("c", "d"))]).await; - //TODO: Just wait for events - tokio::time::sleep(Duration::from_secs(1)).await; publisher.send(PrismEvent::UpdateDAHeight { height: 5100 }); publisher.send(PrismEvent::UpdateDAHeight { height: 5101 }); while let Ok(event_info) = sub.recv().await { diff --git a/crates/node_types/uniffi-lightclient/src/types.rs b/crates/node_types/uniffi-lightclient/src/types.rs index 7ca53ac1..c346b6a4 100644 --- a/crates/node_types/uniffi-lightclient/src/types.rs +++ b/crates/node_types/uniffi-lightclient/src/types.rs @@ -3,6 +3,8 @@ use prism_da::events::PrismEvent; /// Event types emitted by the LightClient. #[derive(uniffi::Enum)] pub enum UniffiLightClientEvent { + /// LightClient is ready to start syncing + Ready, /// Sync has started at a specific height SyncStarted { /// The height at which sync started @@ -62,6 +64,7 @@ pub enum UniffiLightClientEvent { impl From for UniffiLightClientEvent { fn from(event: PrismEvent) -> Self { match event { + PrismEvent::Ready => UniffiLightClientEvent::Ready, PrismEvent::SyncStarted { height } => UniffiLightClientEvent::SyncStarted { height }, PrismEvent::UpdateDAHeight { height } => { UniffiLightClientEvent::UpdateDAHeight { height } From 545635692602a3a5f0ae4a467131632fd199c4d9 Mon Sep 17 00:00:00 2001 From: Ryan Date: Wed, 25 Jun 2025 14:23:31 +0200 Subject: [PATCH 06/12] another (failing) case --- crates/node_types/lightclient/src/lightclient.rs | 8 ++++++-- crates/node_types/lightclient/src/tests/mod.rs | 15 +++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/crates/node_types/lightclient/src/lightclient.rs b/crates/node_types/lightclient/src/lightclient.rs index 00ca16c4..0da81fca 100644 --- a/crates/node_types/lightclient/src/lightclient.rs +++ b/crates/node_types/lightclient/src/lightclient.rs @@ -271,7 +271,8 @@ impl LightClient { Ok(()) } - async fn process_height(&self, height: u64) -> Result<()> { + /// Returns the count of successfully processed epochs + async fn process_height(&self, height: u64) -> Result { info!("processing at DA height {}", height); self.event_pub.send(PrismEvent::EpochVerificationStarted { height }); @@ -282,6 +283,7 @@ impl LightClient { } // Process each finalized epoch + let mut count = 0; for epoch in finalized_epochs { if let Err(e) = self.process_epoch(epoch).await { let error = format!("Failed to process epoch: {}", e); @@ -289,9 +291,11 @@ impl LightClient { height, error: error.clone(), }); + } else { + count += 1; } } - Ok(()) + Ok(count) } Err(e) => { let error = format!("Failed to get epoch: {}", e); diff --git a/crates/node_types/lightclient/src/tests/mod.rs b/crates/node_types/lightclient/src/tests/mod.rs index 8ec17efb..ef9d95f6 100644 --- a/crates/node_types/lightclient/src/tests/mod.rs +++ b/crates/node_types/lightclient/src/tests/mod.rs @@ -5,6 +5,7 @@ use prism_da::{ MockLightDataAvailabilityLayer, MockVerifiableStateTransition, VerifiableStateTransition, events::{EventChannel, EventPublisher, EventSubscriber, PrismEvent}, }; +use prism_errors::EpochVerificationError; use prism_keys::SigningKey; use tokio::spawn; @@ -155,6 +156,20 @@ async fn test_backwards_sync() { } } +#[tokio::test] +async fn test_backwards_sync_ignores_error() { + let (lc, mut sub, publisher) = setup(mock_da![(8, ("a", "b")), (10, "Error")]).await; + + publisher.send(PrismEvent::UpdateDAHeight { height: 20 }); + while let Ok(event_info) = sub.recv().await { + if let PrismEvent::RecursiveVerificationCompleted { height } = event_info.event { + assert_eq!(height, 8); + assert_current_commitment!(lc, "b"); + return; + } + } +} + #[tokio::test] async fn test_incoming_epoch_during_backwards_sync() { let (lc, mut sub, publisher) = setup(mock_da![(5000, ("a", "b")), (5101, ("c", "d"))]).await; From af687eb15c28e655eda44082d383bd4f5530f6be Mon Sep 17 00:00:00 2001 From: Ryan Date: Wed, 25 Jun 2025 15:08:09 +0200 Subject: [PATCH 07/12] fixing bugs --- .../node_types/lightclient/src/lightclient.rs | 91 +++++++++++-------- .../node_types/lightclient/src/tests/mod.rs | 1 - 2 files changed, 51 insertions(+), 41 deletions(-) diff --git a/crates/node_types/lightclient/src/lightclient.rs b/crates/node_types/lightclient/src/lightclient.rs index 0da81fca..90c68b1f 100644 --- a/crates/node_types/lightclient/src/lightclient.rs +++ b/crates/node_types/lightclient/src/lightclient.rs @@ -169,40 +169,56 @@ impl LightClient { let state = self.sync_state.clone(); spawn_task(async move { // Find the most recent valid epoch by searching backward - if let Some(epoch_height) = - light_client.find_most_recent_epoch(network_height, state.clone()).await - { - // Process the found epoch - match light_client.process_height(epoch_height).await { - Ok(_) => { - info!( - "found historical finalized epoch at height {}", - epoch_height - ); - light_client.event_pub.send(PrismEvent::RecursiveVerificationCompleted { - height: epoch_height, - }); - - let mut state = state.write().await; - state.initial_sync_completed = true; - state.initial_sync_in_progress = false; - state.latest_finalized_epoch = Some(epoch_height); - state.current_height = epoch_height + 1; - } - Err(e) => { - error!("Failed to process epoch at height {}: {}", epoch_height, e); - light_client.event_pub.send(PrismEvent::EpochVerificationFailed { - height: epoch_height, - error: e.to_string(), - }); - - // Mark initial sync as complete but don't update current height - let mut state = state.write().await; - state.initial_sync_completed = true; - state.initial_sync_in_progress = false; + let mut current_height = network_height; + let min_height = if current_height > MAX_BACKWARD_SEARCH_DEPTH { + current_height - MAX_BACKWARD_SEARCH_DEPTH + } else { + 1 + }; + while current_height >= min_height { + // Look backwards for the first height with epochs + if let Some((da_height, epochs)) = light_client + .find_most_recent_epoch(current_height, min_height, state.clone()) + .await + { + // Try to find a single valid epoch + for epoch in epochs { + let epoch_height = epoch.height(); + match light_client.process_epoch(epoch).await { + Ok(_) => { + info!( + "found historical finalized epoch at da height {}", + da_height + ); + light_client.event_pub.send( + PrismEvent::RecursiveVerificationCompleted { + height: da_height, + }, + ); + + let mut state = state.write().await; + state.initial_sync_completed = true; + state.initial_sync_in_progress = false; + state.latest_finalized_epoch = Some(epoch_height); + state.current_height = da_height + 1; + + // Break out of the loop if a single epoch is processed successfully + return; + } + Err(e) => { + error!("Failed to process epoch at height {}: {}", da_height, e); + light_client.event_pub.send(PrismEvent::EpochVerificationFailed { + height: da_height, + error: e.to_string(), + }); + + // Keep looking backwards, as long as we haven't reached min_height + current_height = da_height - 1; + } + } } } - } else { + // No epoch found in backward search, mark initial sync as complete // but don't update current height - we'll wait for new epochs let mut state = state.write().await; @@ -215,15 +231,10 @@ impl LightClient { async fn find_most_recent_epoch( &self, start_height: u64, + min_height: u64, state: Arc>, - ) -> Option { + ) -> Option<(u64, Vec)> { let mut height = start_height; - let min_height = if start_height > MAX_BACKWARD_SEARCH_DEPTH { - start_height - MAX_BACKWARD_SEARCH_DEPTH - } else { - 1 - }; - while height >= min_height { // if an epoch has been found, we no longer need to sync historically if state.read().await.latest_finalized_epoch.is_some() { @@ -238,7 +249,7 @@ impl LightClient { if epochs.is_empty() { info!("no data found at height {}", height); } else { - return Some(height); + return Some((height, epochs)); } } Err(e) => { diff --git a/crates/node_types/lightclient/src/tests/mod.rs b/crates/node_types/lightclient/src/tests/mod.rs index ef9d95f6..d15c0767 100644 --- a/crates/node_types/lightclient/src/tests/mod.rs +++ b/crates/node_types/lightclient/src/tests/mod.rs @@ -130,7 +130,6 @@ async fn test_realtime_sync() { assert_current_commitment!(lc, "b"); publisher.send(PrismEvent::UpdateDAHeight { height: 6 }); - wait_for_sync(&mut sub, 6).await; assert_current_commitment!(lc, "b"); publisher.send(PrismEvent::UpdateDAHeight { height: 7 }); From ed04f38c6b84053f5fe56c01976bb97e57681ecf Mon Sep 17 00:00:00 2001 From: Ryan Date: Wed, 25 Jun 2025 15:20:32 +0200 Subject: [PATCH 08/12] new test cases, some fixes --- .../node_types/lightclient/src/lightclient.rs | 8 +++-- .../node_types/lightclient/src/tests/mod.rs | 36 ++++++++++++++++++- 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/crates/node_types/lightclient/src/lightclient.rs b/crates/node_types/lightclient/src/lightclient.rs index 90c68b1f..6ce1842b 100644 --- a/crates/node_types/lightclient/src/lightclient.rs +++ b/crates/node_types/lightclient/src/lightclient.rs @@ -142,8 +142,8 @@ impl LightClient { state.initial_sync_in_progress = false; } - // Update current height to the epoch height + 1 - state.current_height = height + 1; + // Update current height to the da height + state.current_height = height; } } } @@ -200,7 +200,7 @@ impl LightClient { state.initial_sync_completed = true; state.initial_sync_in_progress = false; state.latest_finalized_epoch = Some(epoch_height); - state.current_height = da_height + 1; + state.current_height = da_height; // Break out of the loop if a single epoch is processed successfully return; @@ -212,6 +212,8 @@ impl LightClient { error: e.to_string(), }); + let mut state = state.write().await; + state.current_height = da_height; // Keep looking backwards, as long as we haven't reached min_height current_height = da_height - 1; } diff --git a/crates/node_types/lightclient/src/tests/mod.rs b/crates/node_types/lightclient/src/tests/mod.rs index d15c0767..a76b0018 100644 --- a/crates/node_types/lightclient/src/tests/mod.rs +++ b/crates/node_types/lightclient/src/tests/mod.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use prism_common::digest::Digest; use prism_da::{ @@ -169,6 +169,40 @@ async fn test_backwards_sync_ignores_error() { } } +#[tokio::test] +async fn test_incoming_sync_ignores_error() { + let (lc, mut sub, publisher) = + setup(mock_da![(8, ("a", "b")), (10, "Error"), (12, ("c", "d"))]).await; + + publisher.send(PrismEvent::UpdateDAHeight { height: 8 }); + wait_for_sync(&mut sub, 8).await; + assert_current_commitment!(lc, "b"); + + publisher.send(PrismEvent::UpdateDAHeight { height: 10 }); + publisher.send(PrismEvent::UpdateDAHeight { height: 12 }); + wait_for_sync(&mut sub, 12).await; + assert_current_commitment!(lc, "d"); +} + +// NOTE TO SELF: Handle_new_header method's first block is sus af +// I think we need to make sure it doesnt ever run twice + +#[tokio::test] +async fn test_will_not_process_older_epoch() { + let (lc, mut sub, publisher) = setup(mock_da![(8, ("a", "b")), (9, ("c", "d"))]).await; + + publisher.send(PrismEvent::UpdateDAHeight { height: 10 }); + wait_for_sync(&mut sub, 9).await; + assert_current_commitment!(lc, "d"); + + publisher.send(PrismEvent::UpdateDAHeight { height: 8 }); + // TODO: replace with event listener + tokio::time::sleep(Duration::from_secs(1)).await; + + let sync_state = lc.get_sync_state().await; + assert_eq!(sync_state.current_height, 9); +} + #[tokio::test] async fn test_incoming_epoch_during_backwards_sync() { let (lc, mut sub, publisher) = setup(mock_da![(5000, ("a", "b")), (5101, ("c", "d"))]).await; From e8705c5d19d61d2bf279603c5fbf8fee44b61b9e Mon Sep 17 00:00:00 2001 From: Ryan Date: Thu, 26 Jun 2025 09:49:03 +0200 Subject: [PATCH 09/12] ignoring too old headers --- crates/node_types/lightclient/src/lightclient.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/crates/node_types/lightclient/src/lightclient.rs b/crates/node_types/lightclient/src/lightclient.rs index 6ce1842b..0ac2ba55 100644 --- a/crates/node_types/lightclient/src/lightclient.rs +++ b/crates/node_types/lightclient/src/lightclient.rs @@ -9,7 +9,7 @@ use prism_keys::VerifyingKey; use prism_telemetry_registry::metrics_registry::get_metrics; use std::{self, sync::Arc}; use tokio::sync::RwLock; -use tracing::{error, info}; +use tracing::{error, info, warn}; #[allow(unused_imports)] use sp1_verifier::Groth16Verifier; @@ -111,6 +111,13 @@ impl LightClient { // start initial historical backward sync if needed and not already in progress { let mut state_handle = self.sync_state.write().await; + if state_handle.current_height > height { + warn!( + "new height from headersub {} is lower than synced height, skipping", + height + ); + return; + } if !state_handle.initial_sync_completed && !state_handle.initial_sync_in_progress { state_handle.initial_sync_in_progress = true; drop(state_handle); From fb791b868277c84a3f8bb9741b210dfe251ebbe6 Mon Sep 17 00:00:00 2001 From: Ryan Date: Thu, 26 Jun 2025 10:24:30 +0200 Subject: [PATCH 10/12] simplifications --- .../node_types/lightclient/src/lightclient.rs | 120 ++++-------------- .../node_types/lightclient/src/tests/mod.rs | 3 +- 2 files changed, 26 insertions(+), 97 deletions(-) diff --git a/crates/node_types/lightclient/src/lightclient.rs b/crates/node_types/lightclient/src/lightclient.rs index 0ac2ba55..3cd4064b 100644 --- a/crates/node_types/lightclient/src/lightclient.rs +++ b/crates/node_types/lightclient/src/lightclient.rs @@ -1,8 +1,7 @@ use anyhow::Result; use prism_common::digest::Digest; use prism_da::{ - FinalizedEpoch, LightDataAvailabilityLayer, VerifiableEpoch, VerificationKeys, - events::EventChannel, + LightDataAvailabilityLayer, VerifiableEpoch, VerificationKeys, events::EventChannel, }; use prism_keys::VerifyingKey; #[cfg(feature = "telemetry")] @@ -48,9 +47,9 @@ pub struct LightClient { #[derive(Default, Clone)] pub struct SyncState { + /// The current synced DA height of the light client. pub current_height: u64, - pub initial_sync_completed: bool, - pub initial_sync_in_progress: bool, + /// The current synced epoch height of the light client. pub latest_finalized_epoch: Option, } @@ -88,8 +87,12 @@ impl LightClient { let mut event_sub = self.event_chan.subscribe(); self.event_pub.send(PrismEvent::Ready); + + let mut backwards_sync_started = false; while let Ok(event_info) = event_sub.recv().await { if let PrismEvent::UpdateDAHeight { height } = event_info.event { + info!("new height from headersub {}", height); + #[cfg(feature = "telemetry")] if let Some(metrics) = get_metrics() { metrics.record_celestia_synced_height(height, vec![]); @@ -99,7 +102,14 @@ impl LightClient { metrics.record_current_epoch(latest_finalized_epoch, vec![]); } } - info!("new height from headersub {}", height); + + // start initial historical backward sync if not already in progress + if !backwards_sync_started { + backwards_sync_started = true; + spawn_task(self.clone().start_backward_sync(height)); + continue; + } + self.clone().handle_new_header(height).await; } } @@ -108,20 +118,14 @@ impl LightClient { } async fn handle_new_header(self: Arc, height: u64) { - // start initial historical backward sync if needed and not already in progress { - let mut state_handle = self.sync_state.write().await; + let state_handle = self.sync_state.read().await; if state_handle.current_height > height { warn!( "new height from headersub {} is lower than synced height, skipping", height ); - return; - } - if !state_handle.initial_sync_completed && !state_handle.initial_sync_in_progress { - state_handle.initial_sync_in_progress = true; drop(state_handle); - self.start_backward_sync(height).await; return; } } @@ -134,22 +138,14 @@ impl LightClient { } for epoch in epochs { + let epoch_height = epoch.height(); // Found a new finalized epoch, process it immediately if self.process_epoch(epoch).await.is_ok() { self.event_pub.send(PrismEvent::RecursiveVerificationCompleted { height }); // Update our latest known finalized epoch let mut state = self.sync_state.write().await; - state.latest_finalized_epoch = Some(height); - - // If we're waiting for initial sync, this completes it - if state.initial_sync_in_progress && !state.initial_sync_completed { - info!("finished initial sync"); - state.initial_sync_completed = true; - state.initial_sync_in_progress = false; - } - - // Update current height to the da height + state.latest_finalized_epoch = Some(epoch_height); state.current_height = height; } } @@ -184,9 +180,8 @@ impl LightClient { }; while current_height >= min_height { // Look backwards for the first height with epochs - if let Some((da_height, epochs)) = light_client - .find_most_recent_epoch(current_height, min_height, state.clone()) - .await + if let Some((da_height, epochs)) = + light_client.find_most_recent_epoch(current_height, min_height).await { // Try to find a single valid epoch for epoch in epochs { @@ -204,10 +199,10 @@ impl LightClient { ); let mut state = state.write().await; - state.initial_sync_completed = true; - state.initial_sync_in_progress = false; - state.latest_finalized_epoch = Some(epoch_height); - state.current_height = da_height; + if state.latest_finalized_epoch.is_none() { + state.latest_finalized_epoch = Some(epoch_height); + state.current_height = da_height; + } // Break out of the loop if a single epoch is processed successfully return; @@ -227,12 +222,6 @@ impl LightClient { } } } - - // No epoch found in backward search, mark initial sync as complete - // but don't update current height - we'll wait for new epochs - let mut state = state.write().await; - state.initial_sync_completed = true; - state.initial_sync_in_progress = false; } }) } @@ -241,12 +230,11 @@ impl LightClient { &self, start_height: u64, min_height: u64, - state: Arc>, ) -> Option<(u64, Vec)> { let mut height = start_height; while height >= min_height { // if an epoch has been found, we no longer need to sync historically - if state.read().await.latest_finalized_epoch.is_some() { + if self.sync_state.read().await.latest_finalized_epoch.is_some() { info!( "abandoning historical sync after finding recursive proof at incoming height" ); @@ -328,64 +316,6 @@ impl LightClient { } } - fn extract_commitments(&self, public_values: &[u8]) -> Result<(Digest, Digest)> { - let mut slice = [0u8; 32]; - slice.copy_from_slice(&public_values[..32]); - let proof_prev_commitment = Digest::from(slice); - - let mut slice = [0u8; 32]; - slice.copy_from_slice(&public_values[32..64]); - let proof_current_commitment = Digest::from(slice); - - Ok((proof_prev_commitment, proof_current_commitment)) - } - - fn verify_commitments( - &self, - finalized_epoch: &FinalizedEpoch, - proof_prev_commitment: Digest, - proof_current_commitment: Digest, - ) -> Result<()> { - if finalized_epoch.prev_commitment != proof_prev_commitment - || finalized_epoch.current_commitment != proof_current_commitment - { - // maybe we should forwards events for these kind of errors as well. - return Err(anyhow::anyhow!( - "Commitment mismatch: prev={:?}/{:?}, current={:?}/{:?}", - finalized_epoch.prev_commitment, - proof_prev_commitment, - finalized_epoch.current_commitment, - proof_current_commitment - )); - } - Ok(()) - } - - fn verify_snark_proof( - &self, - finalized_epoch: &FinalizedEpoch, - public_values: &[u8], - ) -> Result<()> { - #[cfg(target_arch = "wasm32")] - let finalized_epoch_proof = &finalized_epoch.proof; - - #[cfg(not(target_arch = "wasm32"))] - let finalized_epoch_proof = &finalized_epoch.proof.bytes(); - - let vkey = if finalized_epoch.height == 0 { - &self.sp1_vkeys.base_vk - } else { - &self.sp1_vkeys.recursive_vk - }; - - Groth16Verifier::verify( - finalized_epoch_proof, - public_values, - vkey, - &sp1_verifier::GROTH16_VK_BYTES, - ) - .map_err(|e| anyhow::anyhow!("SNARK verification failed: {:?}", e)) - } pub async fn get_latest_commitment(&self) -> Option { *self.latest_commitment.read().await } diff --git a/crates/node_types/lightclient/src/tests/mod.rs b/crates/node_types/lightclient/src/tests/mod.rs index a76b0018..8f7dd69e 100644 --- a/crates/node_types/lightclient/src/tests/mod.rs +++ b/crates/node_types/lightclient/src/tests/mod.rs @@ -217,6 +217,5 @@ async fn test_incoming_epoch_during_backwards_sync() { } let sync_state = lc.get_sync_state().await; - assert!(sync_state.initial_sync_completed); - assert!(!sync_state.initial_sync_in_progress); + assert_eq!(sync_state.current_height, 5101); } From de6e0312b3ce867a38b54dd013d71b16b05b01e8 Mon Sep 17 00:00:00 2001 From: Ryan Date: Thu, 26 Jun 2025 11:35:38 +0200 Subject: [PATCH 11/12] new test cases --- crates/da/src/celestia/light_client.rs | 7 +- crates/da/src/events.rs | 14 ++- .../node_types/lightclient/src/lightclient.rs | 64 ++++------ .../node_types/lightclient/src/tests/mod.rs | 119 +++++++++++++++++- .../uniffi-lightclient/src/types.rs | 11 +- 5 files changed, 162 insertions(+), 53 deletions(-) diff --git a/crates/da/src/celestia/light_client.rs b/crates/da/src/celestia/light_client.rs index 823b8d47..0dde34e6 100644 --- a/crates/da/src/celestia/light_client.rs +++ b/crates/da/src/celestia/light_client.rs @@ -22,11 +22,7 @@ use lumina_node::{blockstore::IndexedDbBlockstore, store::IndexedDbStore}; use lumina_node::NodeBuilder; #[cfg(not(target_arch = "wasm32"))] -use { - blockstore::EitherBlockstore, - redb::Database, - tokio::task::spawn_blocking, -}; +use {blockstore::EitherBlockstore, redb::Database, tokio::task::spawn_blocking}; #[cfg(feature = "uniffi")] use lumina_node_uniffi::types::NodeConfig; @@ -183,6 +179,7 @@ impl LightDataAvailabilityLayer for LightClientConnection { Err(e) => return Err(anyhow!("Failed to fetch header: {}", e)), }; + // TODO(Zombeescott): Implement retries + timeout match node.request_all_blobs(&header, self.snark_namespace, None).await { Ok(blobs) => { let epochs: Vec = blobs diff --git a/crates/da/src/events.rs b/crates/da/src/events.rs index 348a8314..54b21a22 100644 --- a/crates/da/src/events.rs +++ b/crates/da/src/events.rs @@ -15,7 +15,8 @@ const EVENT_CHANNEL_CAPACITY: usize = 1024; #[serde(rename_all = "snake_case")] pub enum PrismEvent { Ready, - SyncStarted { height: u64 }, + BackwardsSyncStarted { height: u64 }, + BackwardsSyncCompleted { height: Option }, UpdateDAHeight { height: u64 }, EpochVerificationStarted { height: u64 }, EpochVerified { height: u64 }, @@ -42,8 +43,15 @@ impl fmt::Display for PrismEvent { "Node is ready to start sync and listening for incoming headers" ) } - PrismEvent::SyncStarted { height } => { - write!(f, "Starting sync at height {}", height) + PrismEvent::BackwardsSyncStarted { height } => { + write!(f, "Starting backwards sync at height {}", height) + } + PrismEvent::BackwardsSyncCompleted { height } => { + write!( + f, + "Backwards sync complete, found epoch: {}", + height.is_some() + ) } PrismEvent::UpdateDAHeight { height } => { write!(f, "Updated DA height to {}", height) diff --git a/crates/node_types/lightclient/src/lightclient.rs b/crates/node_types/lightclient/src/lightclient.rs index 3cd4064b..0502a350 100644 --- a/crates/node_types/lightclient/src/lightclient.rs +++ b/crates/node_types/lightclient/src/lightclient.rs @@ -106,7 +106,7 @@ impl LightClient { // start initial historical backward sync if not already in progress if !backwards_sync_started { backwards_sync_started = true; - spawn_task(self.clone().start_backward_sync(height)); + self.clone().start_backward_sync(height).await; continue; } @@ -159,7 +159,7 @@ impl LightClient { async fn start_backward_sync(self: Arc, network_height: u64) { info!("starting historical sync"); // Announce that sync has started - self.event_pub.send(PrismEvent::SyncStarted { + self.event_pub.send(PrismEvent::BackwardsSyncStarted { height: network_height, }); self.event_pub.send(PrismEvent::RecursiveVerificationStarted { @@ -204,6 +204,10 @@ impl LightClient { state.current_height = da_height; } + light_client.event_pub.send(PrismEvent::BackwardsSyncCompleted { + height: Some(da_height), + }); + // Break out of the loop if a single epoch is processed successfully return; } @@ -221,8 +225,15 @@ impl LightClient { } } } + } else { + // This case happens when the incoming sync finds an epoch before the backwards sync does + light_client + .event_pub + .send(PrismEvent::BackwardsSyncCompleted { height: None }); } } + // Mark initial sync as completed, without any success + light_client.event_pub.send(PrismEvent::BackwardsSyncCompleted { height: None }); }) } @@ -266,7 +277,17 @@ impl LightClient { } async fn process_epoch(&self, epoch: VerifiableEpoch) -> Result<()> { - let commitments = epoch.verify(&self.prover_pubkey, &self.sp1_vkeys)?; + let commitments = match epoch.verify(&self.prover_pubkey, &self.sp1_vkeys) { + Ok(commitments) => commitments, + Err(e) => { + error!("failed to verify epoch at height {}: {}", epoch.height(), e); + self.event_pub.send(PrismEvent::EpochVerificationFailed { + height: epoch.height(), + error: e.to_string(), + }); + return Err(anyhow::anyhow!(e)); + } + }; let curr_commitment = commitments.current; // Update latest commitment @@ -279,43 +300,6 @@ impl LightClient { Ok(()) } - /// Returns the count of successfully processed epochs - async fn process_height(&self, height: u64) -> Result { - info!("processing at DA height {}", height); - self.event_pub.send(PrismEvent::EpochVerificationStarted { height }); - - match self.da.get_finalized_epoch(height).await { - Ok(finalized_epochs) => { - if finalized_epochs.is_empty() { - self.event_pub.send(PrismEvent::NoEpochFound { height }); - } - - // Process each finalized epoch - let mut count = 0; - for epoch in finalized_epochs { - if let Err(e) = self.process_epoch(epoch).await { - let error = format!("Failed to process epoch: {}", e); - self.event_pub.send(PrismEvent::EpochVerificationFailed { - height, - error: error.clone(), - }); - } else { - count += 1; - } - } - Ok(count) - } - Err(e) => { - let error = format!("Failed to get epoch: {}", e); - self.event_pub.send(PrismEvent::EpochVerificationFailed { - height, - error: error.clone(), - }); - Err(anyhow::anyhow!(error)) - } - } - } - pub async fn get_latest_commitment(&self) -> Option { *self.latest_commitment.read().await } diff --git a/crates/node_types/lightclient/src/tests/mod.rs b/crates/node_types/lightclient/src/tests/mod.rs index 8f7dd69e..c0fba060 100644 --- a/crates/node_types/lightclient/src/tests/mod.rs +++ b/crates/node_types/lightclient/src/tests/mod.rs @@ -184,8 +184,75 @@ async fn test_incoming_sync_ignores_error() { assert_current_commitment!(lc, "d"); } -// NOTE TO SELF: Handle_new_header method's first block is sus af -// I think we need to make sure it doesnt ever run twice +#[tokio::test] +async fn test_sandwiched_epoch() { + let (lc, mut sub, publisher) = setup(mock_da![(8, "Error1", ("a", "b"), "Error2")]).await; + + publisher.send(PrismEvent::UpdateDAHeight { height: 8 }); + wait_for_sync(&mut sub, 8).await; + assert_current_commitment!(lc, "b"); +} + +#[tokio::test] +async fn no_backwards_sync_underflow() { + let (_, mut sub, publisher) = setup(mock_da![]).await; + + publisher.send(PrismEvent::UpdateDAHeight { height: 50 }); + wait_for_event(&mut sub, |event| { + if let PrismEvent::BackwardsSyncCompleted { height } = event { + assert!(height.is_none()); + return true; + } + false + }) + .await +} + +#[tokio::test] +async fn no_concurrent_backwards_sync() { + let (_, mut sub, publisher) = setup(mock_da![(999, ("a", "b"))]).await; + + publisher.send(PrismEvent::UpdateDAHeight { height: 500 }); + publisher.send(PrismEvent::UpdateDAHeight { height: 1000 }); + + wait_for_event(&mut sub, |event| { + if let PrismEvent::BackwardsSyncStarted { height } = event { + assert_eq!(height, 500); + return true; + } + false + }) + .await; + + wait_for_event(&mut sub, |event| { + if let PrismEvent::BackwardsSyncCompleted { height } = event { + assert!(height.is_none()); + return true; + } + false + }) + .await; +} + +#[tokio::test] +async fn test_backwards_sync_does_not_restart() { + let (lc, mut sub, publisher) = setup(mock_da![(999, ("a", "b"))]).await; + + publisher.send(PrismEvent::UpdateDAHeight { height: 500 }); + + wait_for_event(&mut sub, |event| { + if let PrismEvent::BackwardsSyncCompleted { height } = event { + assert!(height.is_none()); + return true; + } + false + }) + .await; + publisher.send(PrismEvent::UpdateDAHeight { height: 1000 }); + // TODO: Find better way + tokio::time::sleep(Duration::from_secs(1)).await; + assert!(lc.get_sync_state().await.latest_finalized_epoch.is_none()); +} #[tokio::test] async fn test_will_not_process_older_epoch() { @@ -209,13 +276,57 @@ async fn test_incoming_epoch_during_backwards_sync() { publisher.send(PrismEvent::UpdateDAHeight { height: 5100 }); publisher.send(PrismEvent::UpdateDAHeight { height: 5101 }); + + let mut condition_counter = 0; + while let Ok(event_info) = sub.recv().await { + if condition_counter >= 2 { + break; + } + match event_info.event { + PrismEvent::RecursiveVerificationCompleted { height: _ } => { + assert_current_commitment!(lc, "d"); + condition_counter += 1; + } + PrismEvent::BackwardsSyncCompleted { height } => { + assert!(height.is_none()); + condition_counter += 1; + } + _ => {} + } + } +} + +#[tokio::test] +async fn test_incoming_epoch_after_backwards_sync() { + let (lc, mut sub, publisher) = setup(mock_da![(5000, ("a", "b")), (5101, ("c", "d"))]).await; + + publisher.send(PrismEvent::UpdateDAHeight { height: 5100 }); + wait_for_event(&mut sub, |event| { + if let PrismEvent::BackwardsSyncCompleted { height } = event { + return matches!(height, Some(5000)); + } + false + }) + .await; + publisher.send(PrismEvent::UpdateDAHeight { height: 5101 }); while let Ok(event_info) = sub.recv().await { if let PrismEvent::RecursiveVerificationCompleted { height: _ } = event_info.event { assert_current_commitment!(lc, "d"); return; } } +} - let sync_state = lc.get_sync_state().await; - assert_eq!(sync_state.current_height, 5101); +#[tokio::test] +async fn test_backwards_sync_completes() { + let (_, mut sub, publisher) = setup(mock_da![]).await; + + publisher.send(PrismEvent::UpdateDAHeight { height: 5100 }); + wait_for_event(&mut sub, |event| { + if let PrismEvent::BackwardsSyncCompleted { height } = event { + return height.is_none(); + } + false + }) + .await; } diff --git a/crates/node_types/uniffi-lightclient/src/types.rs b/crates/node_types/uniffi-lightclient/src/types.rs index c346b6a4..46f7bce9 100644 --- a/crates/node_types/uniffi-lightclient/src/types.rs +++ b/crates/node_types/uniffi-lightclient/src/types.rs @@ -10,6 +10,10 @@ pub enum UniffiLightClientEvent { /// The height at which sync started height: u64, }, + SyncCompleted { + /// The height at which sync completed + height: Option, + }, /// DA layer height has been updated UpdateDAHeight { /// The new DA layer height @@ -65,7 +69,12 @@ impl From for UniffiLightClientEvent { fn from(event: PrismEvent) -> Self { match event { PrismEvent::Ready => UniffiLightClientEvent::Ready, - PrismEvent::SyncStarted { height } => UniffiLightClientEvent::SyncStarted { height }, + PrismEvent::BackwardsSyncStarted { height } => { + UniffiLightClientEvent::SyncStarted { height } + } + PrismEvent::BackwardsSyncCompleted { height } => { + UniffiLightClientEvent::SyncCompleted { height } + } PrismEvent::UpdateDAHeight { height } => { UniffiLightClientEvent::UpdateDAHeight { height } } From edf73f3b07f4bd436b31ad89544803083a47d63b Mon Sep 17 00:00:00 2001 From: Ryan Date: Thu, 26 Jun 2025 12:20:55 +0200 Subject: [PATCH 12/12] improvements --- .../node_types/lightclient/src/lightclient.rs | 10 ++- .../node_types/lightclient/src/tests/mod.rs | 64 ++++++++++++++----- 2 files changed, 54 insertions(+), 20 deletions(-) diff --git a/crates/node_types/lightclient/src/lightclient.rs b/crates/node_types/lightclient/src/lightclient.rs index 0502a350..ef921703 100644 --- a/crates/node_types/lightclient/src/lightclient.rs +++ b/crates/node_types/lightclient/src/lightclient.rs @@ -211,6 +211,9 @@ impl LightClient { // Break out of the loop if a single epoch is processed successfully return; } + // This is the only branch that should trigger the + // while loop to continue, the other branches all + // return Err(e) => { error!("Failed to process epoch at height {}: {}", da_height, e); light_client.event_pub.send(PrismEvent::EpochVerificationFailed { @@ -226,14 +229,15 @@ impl LightClient { } } } else { - // This case happens when the incoming sync finds an epoch before the backwards sync does + // This case happens when the incoming sync finds an epoch + // before the backwards sync does, or we have exhausted + // minimum height light_client .event_pub .send(PrismEvent::BackwardsSyncCompleted { height: None }); + return; } } - // Mark initial sync as completed, without any success - light_client.event_pub.send(PrismEvent::BackwardsSyncCompleted { height: None }); }) } diff --git a/crates/node_types/lightclient/src/tests/mod.rs b/crates/node_types/lightclient/src/tests/mod.rs index c0fba060..4ddb275a 100644 --- a/crates/node_types/lightclient/src/tests/mod.rs +++ b/crates/node_types/lightclient/src/tests/mod.rs @@ -271,28 +271,58 @@ async fn test_will_not_process_older_epoch() { } #[tokio::test] -async fn test_incoming_epoch_during_backwards_sync() { +async fn test_incoming_epoch_during_backwards_sync_v1() { let (lc, mut sub, publisher) = setup(mock_da![(5000, ("a", "b")), (5101, ("c", "d"))]).await; - publisher.send(PrismEvent::UpdateDAHeight { height: 5100 }); - publisher.send(PrismEvent::UpdateDAHeight { height: 5101 }); + let result = tokio::time::timeout(Duration::from_secs(5), async { + // Start the event loop first, then send events after we're ready to receive + let event_task = tokio::spawn(async move { + let mut events_received = (false, false); // (recursive, backwards) - let mut condition_counter = 0; - while let Ok(event_info) = sub.recv().await { - if condition_counter >= 2 { - break; - } - match event_info.event { - PrismEvent::RecursiveVerificationCompleted { height: _ } => { - assert_current_commitment!(lc, "d"); - condition_counter += 1; - } - PrismEvent::BackwardsSyncCompleted { height } => { - assert!(height.is_none()); - condition_counter += 1; + while let Ok(event_info) = sub.recv().await { + match event_info.event { + PrismEvent::RecursiveVerificationCompleted { height: _ } => { + assert_current_commitment!(lc, "d"); + events_received.0 = true; + } + PrismEvent::BackwardsSyncCompleted { height } => { + assert!(height.is_none()); + events_received.1 = true; + } + _ => {} + } + + // Break when both conditions are met + if events_received.0 && events_received.1 { + break; + } } - _ => {} + events_received + }); + + // Small delay to ensure the receiver is ready + tokio::time::sleep(Duration::from_millis(10)).await; + + publisher.send(PrismEvent::UpdateDAHeight { height: 5100 }); + publisher.send(PrismEvent::UpdateDAHeight { height: 5101 }); + + let (recursive_completed, backwards_completed) = event_task.await.unwrap(); + (recursive_completed, backwards_completed) + }) + .await; + + match result { + Ok((recursive_completed, backwards_completed)) => { + assert!( + recursive_completed, + "RecursiveVerificationCompleted event not received" + ); + assert!( + backwards_completed, + "BackwardsSyncCompleted event not received" + ); } + Err(_) => panic!("Test timed out waiting for events"), } }