Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ sha2-v0-10-8 = { git = "https://github.com/sp1-patches/RustCrypto-hashes", packa
curve25519-dalek-ng = { git = "https://github.com/sp1-patches/curve25519-dalek-ng", tag = "patch-4.1.1-sp1-4.0.0" }
p256 = { git = "https://github.com/sp1-patches/elliptic-curves", tag = "patch-p256-13.2-sp1-4.1.0" }
k256 = { git = "https://github.com/sp1-patches/elliptic-curves", tag = "patch-k256-13.4-sp1-4.1.0" }
celestia-types = { git = "https://github.com/deltadevsde/lumina.git" }
celestia-types = { git = "https://github.com/deltadevsde/lumina.git" }
lumina-node = { git = "https://github.com/deltadevsde/lumina.git" }

[workspace.features]
Expand Down
9 changes: 8 additions & 1 deletion crates/da/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use lumina_node::events::{NodeEvent, EventSubscriber as LuminaEventSub};
use lumina_node::events::{EventSubscriber as LuminaEventSub, NodeEvent};
use prism_common::digest::Digest;
use serde::Serialize;
use std::{fmt, sync::Arc};
Expand All @@ -14,6 +14,7 @@ const EVENT_CHANNEL_CAPACITY: usize = 1024;
#[serde(tag = "type")]
#[serde(rename_all = "snake_case")]
pub enum PrismEvent {
Ready,
SyncStarted { height: u64 },
UpdateDAHeight { height: u64 },
EpochVerificationStarted { height: u64 },
Expand All @@ -35,6 +36,12 @@ pub enum PrismEvent {
impl fmt::Display for PrismEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PrismEvent::Ready => {
write!(
f,
"Node is ready to start sync and listening for incoming headers"
)
}
PrismEvent::SyncStarted { height } => {
write!(f, "Starting sync at height {}", height)
}
Expand Down
1 change: 1 addition & 0 deletions crates/da/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl From<EpochCommitments> for (Digest, Digest) {

/// `VerifiableStateTransition` is a trait wrapper around `FinalizedEpoch` that allows for mocking.
/// The only concrete implementation of this trait is by `FinalizedEpoch`.
#[automock]
pub trait VerifiableStateTransition: Send {
fn verify(
&self,
Expand Down
3 changes: 3 additions & 0 deletions crates/node_types/lightclient/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
pub mod lightclient;
pub use lightclient::LightClient;

#[cfg(test)]
mod tests;
46 changes: 23 additions & 23 deletions crates/node_types/lightclient/src/lightclient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,18 @@ pub struct LightClient {
/// The event channel, used to spawn new subscribers and publishers.
event_chan: Arc<EventChannel>,
event_pub: Arc<EventPublisher>,
sync_state: Arc<RwLock<SyncState>>,

// The latest commitment.
latest_commitment: Arc<RwLock<Option<Digest>>>,
}

struct SyncState {
current_height: u64,
initial_sync_completed: bool,
initial_sync_in_progress: bool,
latest_finalized_epoch: Option<u64>,
#[derive(Default, Clone)]
pub struct SyncState {
pub current_height: u64,
pub initial_sync_completed: bool,
pub initial_sync_in_progress: bool,
pub latest_finalized_epoch: Option<u64>,
}

#[allow(dead_code)]
Expand All @@ -64,53 +66,55 @@ impl LightClient {
let event_chan = da.event_channel();
let event_pub = Arc::new(event_chan.publisher());

let sync_state = Arc::new(RwLock::new(SyncState::default()));

Self {
da,
sp1_vkeys,
prover_pubkey,
event_chan,
event_pub,
latest_commitment: Arc::new(RwLock::new(None)),
sync_state,
}
}

pub async fn get_sync_state(&self) -> SyncState {
self.sync_state.read().await.clone()
}

pub async fn run(self: Arc<Self>) -> Result<()> {
// start listening for new headers to update sync target
let sync_state = Arc::new(RwLock::new(SyncState {
current_height: 0,
initial_sync_completed: false,
initial_sync_in_progress: false,
latest_finalized_epoch: None,
}));

let mut event_sub = self.event_chan.subscribe();
self.event_pub.send(PrismEvent::Ready);
while let Ok(event_info) = event_sub.recv().await {
if let PrismEvent::UpdateDAHeight { height } = event_info.event {
#[cfg(feature = "telemetry")]
if let Some(metrics) = get_metrics() {
metrics.record_celestia_synced_height(height, vec![]);
if let Some(latest_finalized_epoch) =
sync_state.read().await.latest_finalized_epoch
self.sync_state.read().await.latest_finalized_epoch
{
metrics.record_current_epoch(latest_finalized_epoch, vec![]);
}
}
info!("new height from headersub {}", height);
self.clone().handle_new_header(height, sync_state.clone()).await;
self.clone().handle_new_header(height).await;
}
}

Ok(())
}

async fn handle_new_header(self: Arc<Self>, height: u64, state: Arc<RwLock<SyncState>>) {
async fn handle_new_header(self: Arc<Self>, height: u64) {
// start initial historical backward sync if needed and not already in progress
{
let mut state_handle = state.write().await;
let mut state_handle = self.sync_state.write().await;
if !state_handle.initial_sync_completed && !state_handle.initial_sync_in_progress {
state_handle.initial_sync_in_progress = true;
drop(state_handle);
self.start_backward_sync(height, state.clone()).await;
self.start_backward_sync(height).await;
return;
}
}
Expand All @@ -128,7 +132,7 @@ impl LightClient {
self.event_pub.send(PrismEvent::RecursiveVerificationCompleted { height });

// Update our latest known finalized epoch
let mut state = state.write().await;
let mut state = self.sync_state.write().await;
state.latest_finalized_epoch = Some(height);

// If we're waiting for initial sync, this completes it
Expand All @@ -149,11 +153,7 @@ impl LightClient {
}
}

async fn start_backward_sync(
self: Arc<Self>,
network_height: u64,
state: Arc<RwLock<SyncState>>,
) {
async fn start_backward_sync(self: Arc<Self>, network_height: u64) {
info!("starting historical sync");
// Announce that sync has started
self.event_pub.send(PrismEvent::SyncStarted {
Expand All @@ -166,7 +166,7 @@ impl LightClient {
// Start a task to find a finalized epoch by searching backward
let light_client = Arc::clone(&self);

let state = state.clone();
let state = self.sync_state.clone();
spawn_task(async move {
// Find the most recent valid epoch by searching backward
if let Some(epoch_height) =
Expand Down
174 changes: 174 additions & 0 deletions crates/node_types/lightclient/src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
use std::sync::Arc;

use prism_common::digest::Digest;
use prism_da::{
MockLightDataAvailabilityLayer, MockVerifiableStateTransition, VerifiableStateTransition,
events::{EventChannel, EventPublisher, EventSubscriber, PrismEvent},
};
use prism_keys::SigningKey;
use tokio::spawn;

use crate::LightClient;

macro_rules! mock_da {
($(($height:expr, $($spec:tt),+)),* $(,)?) => {{
let mut mock_da = MockLightDataAvailabilityLayer::new();
mock_da.expect_get_finalized_epoch().returning(move |height| {
match height {
$(
$height => {
let mut transitions = vec![];
$(
let mut epoch = MockVerifiableStateTransition::new();
mock_da!(@make_epoch epoch, $spec, $height);
transitions.push(Box::new(epoch) as Box<dyn VerifiableStateTransition>);
)+
Ok(transitions)
}
)*
_ => Ok(vec![]),
}
});
mock_da
}};

// Success case - tuple
(@make_epoch $epoch:ident, ($h1:expr, $h2:expr), $height:expr) => {
let hash1 = $h1;
let hash2 = $h2;
$epoch.expect_height().returning(move || $height);
$epoch.expect_verify().returning(move |_, _| {
Ok((Digest::hash(hash1), Digest::hash(hash2)).into())
});
};

// Error case - Err(...)
(@make_epoch $epoch:ident, Err($error:expr), $height:expr) => {
let err = $error;
$epoch.expect_height().returning(move || $height);
$epoch.expect_verify().returning(move |_, _| Err(err));
};

// String error shorthand
(@make_epoch $epoch:ident, $error:literal, $height:expr) => {
let err_msg = $error;
$epoch.expect_height().returning(move || $height);
$epoch.expect_verify().returning(move |_, _| {
Err(EpochVerificationError::ProofVerificationError(err_msg.to_string()))
});
};
}

async fn wait_for_sync(sub: &mut EventSubscriber, target_height: u64) {
wait_for_event(sub, |event| match event {
PrismEvent::EpochVerified { height } => height >= target_height,
PrismEvent::EpochVerificationFailed { height, .. } => height >= target_height,
_ => false,
})
.await;
}

macro_rules! assert_current_commitment {
($lc:expr, $expected:expr) => {
let actual = $lc.get_latest_commitment().await.unwrap();
assert_eq!(Digest::hash($expected), actual);
};
}

async fn wait_for_event<F>(sub: &mut EventSubscriber, mut handler: F)
where
F: FnMut(PrismEvent) -> bool, // return true to break the loop
{
while let Ok(event_info) = sub.recv().await {
if handler(event_info.event) {
break;
}
}
}

async fn setup(
mut mock_da: MockLightDataAvailabilityLayer,
) -> (Arc<LightClient>, EventSubscriber, EventPublisher) {
let chan = EventChannel::new();
let publisher = chan.publisher();
let arced_chan = Arc::new(chan);
mock_da.expect_event_channel().return_const(arced_chan.clone());

let mock_da = Arc::new(mock_da);

let prover_key = SigningKey::new_ed25519();
let lc = Arc::new(LightClient::new(mock_da, prover_key.verifying_key()));

let runner = lc.clone();
spawn(async move {
runner.run().await.unwrap();
});
let mut sub = arced_chan.clone().subscribe();
wait_for_event(&mut sub, |event| matches!(event, PrismEvent::Ready)).await;
(lc, sub, publisher)
}

#[tokio::test]
async fn test_realtime_sync() {
let (lc, mut sub, publisher) = setup(mock_da![
(4, ("g", "a")),
(5, ("a", "b")),
(7, ("b", "c"), ("c", "d")),
(8, ("d", "e")),
])
.await;

publisher.send(PrismEvent::UpdateDAHeight { height: 3 });

publisher.send(PrismEvent::UpdateDAHeight { height: 4 });
wait_for_sync(&mut sub, 4).await;
assert_current_commitment!(lc, "a");

publisher.send(PrismEvent::UpdateDAHeight { height: 5 });
wait_for_sync(&mut sub, 5).await;
assert_current_commitment!(lc, "b");

publisher.send(PrismEvent::UpdateDAHeight { height: 6 });
wait_for_sync(&mut sub, 6).await;
assert_current_commitment!(lc, "b");

publisher.send(PrismEvent::UpdateDAHeight { height: 7 });
wait_for_sync(&mut sub, 7).await;
assert_current_commitment!(lc, "d");

publisher.send(PrismEvent::UpdateDAHeight { height: 8 });
wait_for_sync(&mut sub, 8).await;
assert_current_commitment!(lc, "e");
}

#[tokio::test]
async fn test_backwards_sync() {
let (lc, mut sub, publisher) = setup(mock_da![(8, ("a", "b"))]).await;

publisher.send(PrismEvent::UpdateDAHeight { height: 20 });
while let Ok(event_info) = sub.recv().await {
if let PrismEvent::RecursiveVerificationCompleted { height } = event_info.event {
assert_eq!(height, 8);
assert_current_commitment!(lc, "b");
return;
}
}
}

#[tokio::test]
async fn test_incoming_epoch_during_backwards_sync() {
let (lc, mut sub, publisher) = setup(mock_da![(5000, ("a", "b")), (5101, ("c", "d"))]).await;

publisher.send(PrismEvent::UpdateDAHeight { height: 5100 });
publisher.send(PrismEvent::UpdateDAHeight { height: 5101 });
while let Ok(event_info) = sub.recv().await {
if let PrismEvent::RecursiveVerificationCompleted { height: _ } = event_info.event {
assert_current_commitment!(lc, "d");
return;
}
}

let sync_state = lc.get_sync_state().await;
assert!(sync_state.initial_sync_completed);
assert!(!sync_state.initial_sync_in_progress);
}
3 changes: 3 additions & 0 deletions crates/node_types/uniffi-lightclient/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use prism_da::events::PrismEvent;
/// Event types emitted by the LightClient.
#[derive(uniffi::Enum)]
pub enum UniffiLightClientEvent {
/// LightClient is ready to start syncing
Ready,
/// Sync has started at a specific height
SyncStarted {
/// The height at which sync started
Expand Down Expand Up @@ -62,6 +64,7 @@ pub enum UniffiLightClientEvent {
impl From<PrismEvent> for UniffiLightClientEvent {
fn from(event: PrismEvent) -> Self {
match event {
PrismEvent::Ready => UniffiLightClientEvent::Ready,
PrismEvent::SyncStarted { height } => UniffiLightClientEvent::SyncStarted { height },
PrismEvent::UpdateDAHeight { height } => {
UniffiLightClientEvent::UpdateDAHeight { height }
Expand Down
Loading