Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ sha2-v0-10-8 = { git = "https://github.com/sp1-patches/RustCrypto-hashes", packa
curve25519-dalek-ng = { git = "https://github.com/sp1-patches/curve25519-dalek-ng", tag = "patch-4.1.1-sp1-4.0.0" }
p256 = { git = "https://github.com/sp1-patches/elliptic-curves", tag = "patch-p256-13.2-sp1-4.1.0" }
k256 = { git = "https://github.com/sp1-patches/elliptic-curves", tag = "patch-k256-13.4-sp1-4.1.0" }
celestia-types = { git = "https://github.com/deltadevsde/lumina.git" }
celestia-types = { git = "https://github.com/deltadevsde/lumina.git" }
lumina-node = { git = "https://github.com/deltadevsde/lumina.git" }

[workspace.features]
Expand Down
1 change: 1 addition & 0 deletions crates/da/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl From<EpochCommitments> for (Digest, Digest) {

/// `VerifiableStateTransition` is a trait wrapper around `FinalizedEpoch` that allows for mocking.
/// The only concrete implementation of this trait is by `FinalizedEpoch`.
#[automock]
pub trait VerifiableStateTransition: Send {
fn verify(
&self,
Expand Down
3 changes: 3 additions & 0 deletions crates/node_types/lightclient/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
pub mod lightclient;
pub use lightclient::LightClient;

#[cfg(test)]
mod tests;
45 changes: 22 additions & 23 deletions crates/node_types/lightclient/src/lightclient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,18 @@ pub struct LightClient {
/// The event channel, used to spawn new subscribers and publishers.
event_chan: Arc<EventChannel>,
event_pub: Arc<EventPublisher>,
sync_state: Arc<RwLock<SyncState>>,

// The latest commitment.
latest_commitment: Arc<RwLock<Option<Digest>>>,
}

struct SyncState {
current_height: u64,
initial_sync_completed: bool,
initial_sync_in_progress: bool,
latest_finalized_epoch: Option<u64>,
#[derive(Default, Clone)]
pub struct SyncState {
pub current_height: u64,
pub initial_sync_completed: bool,
pub initial_sync_in_progress: bool,
pub latest_finalized_epoch: Option<u64>,
}

#[allow(dead_code)]
Expand All @@ -64,24 +66,25 @@ impl LightClient {
let event_chan = da.event_channel();
let event_pub = Arc::new(event_chan.publisher());

let sync_state = Arc::new(RwLock::new(SyncState::default()));

Self {
da,
sp1_vkeys,
prover_pubkey,
event_chan,
event_pub,
latest_commitment: Arc::new(RwLock::new(None)),
sync_state,
}
}

pub async fn get_sync_state(&self) -> SyncState {
self.sync_state.read().await.clone()
}

pub async fn run(self: Arc<Self>) -> Result<()> {
// start listening for new headers to update sync target
let sync_state = Arc::new(RwLock::new(SyncState {
current_height: 0,
initial_sync_completed: false,
initial_sync_in_progress: false,
latest_finalized_epoch: None,
}));

let mut event_sub = self.event_chan.subscribe();
while let Ok(event_info) = event_sub.recv().await {
Expand All @@ -90,27 +93,27 @@ impl LightClient {
if let Some(metrics) = get_metrics() {
metrics.record_celestia_synced_height(height, vec![]);
if let Some(latest_finalized_epoch) =
sync_state.read().await.latest_finalized_epoch
self.sync_state.read().await.latest_finalized_epoch
{
metrics.record_current_epoch(latest_finalized_epoch, vec![]);
}
}
info!("new height from headersub {}", height);
self.clone().handle_new_header(height, sync_state.clone()).await;
self.clone().handle_new_header(height).await;
}
}

Ok(())
}

async fn handle_new_header(self: Arc<Self>, height: u64, state: Arc<RwLock<SyncState>>) {
async fn handle_new_header(self: Arc<Self>, height: u64) {
// start initial historical backward sync if needed and not already in progress
{
let mut state_handle = state.write().await;
let mut state_handle = self.sync_state.write().await;
if !state_handle.initial_sync_completed && !state_handle.initial_sync_in_progress {
state_handle.initial_sync_in_progress = true;
drop(state_handle);
self.start_backward_sync(height, state.clone()).await;
self.start_backward_sync(height).await;
return;
}
}
Expand All @@ -128,7 +131,7 @@ impl LightClient {
self.event_pub.send(PrismEvent::RecursiveVerificationCompleted { height });

// Update our latest known finalized epoch
let mut state = state.write().await;
let mut state = self.sync_state.write().await;
state.latest_finalized_epoch = Some(height);

// If we're waiting for initial sync, this completes it
Expand All @@ -149,11 +152,7 @@ impl LightClient {
}
}

async fn start_backward_sync(
self: Arc<Self>,
network_height: u64,
state: Arc<RwLock<SyncState>>,
) {
async fn start_backward_sync(self: Arc<Self>, network_height: u64) {
info!("starting historical sync");
// Announce that sync has started
self.event_pub.send(PrismEvent::SyncStarted {
Expand All @@ -166,7 +165,7 @@ impl LightClient {
// Start a task to find a finalized epoch by searching backward
let light_client = Arc::clone(&self);

let state = state.clone();
let state = self.sync_state.clone();
spawn_task(async move {
// Find the most recent valid epoch by searching backward
if let Some(epoch_height) =
Expand Down
203 changes: 203 additions & 0 deletions crates/node_types/lightclient/src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
use std::{sync::Arc, time::Duration};

use prism_common::digest::Digest;
use prism_da::{
MockLightDataAvailabilityLayer, MockVerifiableStateTransition, VerifiableStateTransition,
events::{EventChannel, PrismEvent},
};
use prism_keys::SigningKey;
use tokio::spawn;

use crate::LightClient;

macro_rules! mock_da {
($(($height:expr, $($spec:tt),+)),* $(,)?) => {{
let mut mock_da = MockLightDataAvailabilityLayer::new();
mock_da.expect_get_finalized_epoch().returning(move |height| {
match height {
$(
$height => {
let mut transitions = vec![];
$(
let mut epoch = MockVerifiableStateTransition::new();
mock_da!(@make_epoch epoch, $spec, $height);
transitions.push(Box::new(epoch) as Box<dyn VerifiableStateTransition>);
)+
Ok(transitions)
}
)*
_ => Ok(vec![]),
}
});
mock_da
}};

// Success case - tuple
(@make_epoch $epoch:ident, ($h1:expr, $h2:expr), $height:expr) => {
let hash1 = $h1;
let hash2 = $h2;
$epoch.expect_height().returning(move || $height);
$epoch.expect_verify().returning(move |_, _| {
Ok((Digest::hash(hash1), Digest::hash(hash2)).into())
});
};

// Error case - Err(...)
(@make_epoch $epoch:ident, Err($error:expr), $height:expr) => {
let err = $error;
$epoch.expect_height().returning(move || $height);
$epoch.expect_verify().returning(move |_, _| Err(err));
};

// String error shorthand
(@make_epoch $epoch:ident, $error:literal, $height:expr) => {
let err_msg = $error;
$epoch.expect_height().returning(move || $height);
$epoch.expect_verify().returning(move |_, _| {
Err(EpochVerificationError::ProofVerificationError(err_msg.to_string()))
});
};
}

macro_rules! wait_for_sync {
($sub:expr, $target_height:expr) => {{
while let Ok(event) = $sub.recv().await {
match event.event {
PrismEvent::EpochVerified { height } => {
if height >= $target_height {
return;
}
}
PrismEvent::EpochVerificationFailed { height, error } => {
if height >= $target_height {
// TODO: Placeholder
println!("Epoch verification failed at height {}: {}", height, error);
return;
}
}
_ => {}
}
}
}};
}

macro_rules! assert_current_commitment {
($lc:expr, $expected:expr) => {
let actual = $lc.get_latest_commitment().await.unwrap();
assert_eq!(Digest::hash($expected), actual);
};
}

#[tokio::test]
async fn test_realtime_sync() {
let mut mock_da = mock_da![
(4, ("g", "a")),
(5, ("a", "b")),
(7, ("b", "c"), ("c", "d")),
(8, ("d", "e")),
];
let chan = EventChannel::new();
let publisher = chan.publisher();
let arced_chan = Arc::new(chan);
mock_da.expect_event_channel().return_const(arced_chan.clone());

let mock_da = Arc::new(mock_da);

let prover_key = SigningKey::new_ed25519();
let lc = Arc::new(LightClient::new(mock_da, prover_key.verifying_key()));

let runner = lc.clone();
spawn(async move {
runner.run().await.unwrap();
});
let mut sub = arced_chan.clone().subscribe();

//TODO: Just wait for events
tokio::time::sleep(Duration::from_secs(1)).await;
publisher.send(PrismEvent::UpdateDAHeight { height: 3 });

publisher.send(PrismEvent::UpdateDAHeight { height: 4 });
wait_for_sync!(sub, 4);
assert_current_commitment!(lc, "a");

publisher.send(PrismEvent::UpdateDAHeight { height: 5 });
wait_for_sync!(sub, 5);
assert_current_commitment!(lc, "b");

publisher.send(PrismEvent::UpdateDAHeight { height: 6 });
wait_for_sync!(sub, 6);
assert_current_commitment!(lc, "b");

publisher.send(PrismEvent::UpdateDAHeight { height: 7 });
wait_for_sync!(sub, 6);
assert_current_commitment!(lc, "d");

publisher.send(PrismEvent::UpdateDAHeight { height: 8 });
wait_for_sync!(sub, 7);
assert_current_commitment!(lc, "e");
}

#[tokio::test]
async fn test_backwards_sync() {
let mut mock_da = mock_da![(8, ("a", "b")),];
let chan = EventChannel::new();
let publisher = chan.publisher();
let arced_chan = Arc::new(chan);
mock_da.expect_event_channel().return_const(arced_chan.clone());

let mock_da = Arc::new(mock_da);

let prover_key = SigningKey::new_ed25519();
let lc = Arc::new(LightClient::new(mock_da, prover_key.verifying_key()));

let runner = lc.clone();
spawn(async move {
runner.run().await.unwrap();
});
let mut sub = arced_chan.clone().subscribe();

//TODO: Just wait for events
tokio::time::sleep(Duration::from_secs(1)).await;
publisher.send(PrismEvent::UpdateDAHeight { height: 20 });
while let Ok(event_info) = sub.recv().await {
if let PrismEvent::RecursiveVerificationCompleted { height: _ } = event_info.event {
assert_current_commitment!(lc, "b");
return;
}
}
}

#[tokio::test]
async fn test_incoming_epoch_during_backwards_sync() {
let mut mock_da = mock_da![(5000, ("a", "b")), (5101, ("c", "d"))];
let chan = EventChannel::new();
let publisher = chan.publisher();
let arced_chan = Arc::new(chan);
mock_da.expect_event_channel().return_const(arced_chan.clone());

let mock_da = Arc::new(mock_da);

let prover_key = SigningKey::new_ed25519();
let lc = Arc::new(LightClient::new(mock_da, prover_key.verifying_key()));

let runner = lc.clone();
spawn(async move {
runner.run().await.unwrap();
});
let mut sub = arced_chan.clone().subscribe();

//TODO: Just wait for events
tokio::time::sleep(Duration::from_secs(1)).await;
publisher.send(PrismEvent::UpdateDAHeight { height: 5100 });
publisher.send(PrismEvent::UpdateDAHeight { height: 5101 });
while let Ok(event_info) = sub.recv().await {
if let PrismEvent::RecursiveVerificationCompleted { height: _ } = event_info.event {
assert_current_commitment!(lc, "d");
return;
}
}

let sync_state = lc.get_sync_state().await;
assert!(sync_state.initial_sync_completed);
assert!(!sync_state.initial_sync_in_progress);
}
Loading