Skip to content

Commit 85d067f

Browse files
refactor(da): Wrapping FinalizedEpoch in a trait to make mockable (#342)
* verifiable epochs * refactor: moving event channel code to DA crate * refactor: renaming LightClientEvent to PrismEvent * feat: implement event channel for inmemory da and fn celestia * a bit more sensible errors for FinalizedEpoch verification * clippy * fix: review comments * update
1 parent a9846df commit 85d067f

File tree

23 files changed

+502
-309
lines changed

23 files changed

+502
-309
lines changed

Cargo.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/cli/src/cfg.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use prism_storage::{
1717
use prism_telemetry::config::{TelemetryConfig, get_default_telemetry_config};
1818
use serde::{Deserialize, Serialize};
1919
use std::{fs, path::Path, str::FromStr, sync::Arc, time::Duration};
20-
use tracing::{error, info, warn};
20+
use tracing::{error, info};
2121

2222
use prism_da::{
2323
DataAvailabilityLayer, LightDataAvailabilityLayer,
@@ -201,12 +201,6 @@ pub fn load_config(args: CommandArgs) -> Result<Config> {
201201

202202
info!("Final config: {:?}", final_config);
203203

204-
if final_config.network.verifying_key.is_none() {
205-
warn!(
206-
"prover's verifying key was not provided. this is not recommended and epoch signatures will not be verified."
207-
);
208-
}
209-
210204
Ok(final_config)
211205
}
212206

@@ -301,7 +295,7 @@ fn apply_command_line_args(config: Config, args: CommandArgs) -> Config {
301295
verifying_key: args
302296
.verifying_key
303297
.and_then(|x| VerifyingKey::from_base64(x).ok())
304-
.or(network_config.clone().verifying_key),
298+
.unwrap_or(network_config.verifying_key.clone()),
305299
celestia_config,
306300
},
307301
keystore_type: args.keystore_type.or(config.keystore_type),

crates/cli/src/main.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use prism_telemetry_registry::{init::init, metrics_registry::get_metrics};
1414
use std::io::{Error, ErrorKind};
1515

1616
use node_types::NodeType;
17-
use prism_lightclient::{LightClient, events::EventChannel};
17+
use prism_lightclient::LightClient;
1818
use prism_prover::Prover;
1919
use std::sync::Arc;
2020
use tokio_util::sync::CancellationToken;
@@ -90,13 +90,7 @@ async fn main() -> std::io::Result<()> {
9090
Error::other(e.to_string())
9191
})?;
9292

93-
let event_channel = EventChannel::new();
94-
95-
Arc::new(LightClient::new(
96-
da,
97-
verifying_key,
98-
event_channel.publisher(),
99-
))
93+
Arc::new(LightClient::new(da, verifying_key))
10094
}
10195
Commands::Prover(_) => {
10296
let db = initialize_db(&config).map_err(|e| Error::other(e.to_string()))?;
@@ -155,10 +149,7 @@ async fn main() -> std::io::Result<()> {
155149

156150
let signing_key = get_signing_key(keystore_type, keystore_path)?;
157151

158-
let verifying_key =
159-
config.network.verifying_key.clone().ok_or_else(|| {
160-
Error::new(ErrorKind::NotFound, "prover verifying key not found")
161-
})?;
152+
let verifying_key = config.network.verifying_key.clone();
162153

163154
// When SP1_PROVER is set to mock, disable recursive proofs
164155
let recursive_proofs = std::env::var("SP1_PROVER").map_or(true, |val| val != "mock");

crates/da/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,24 @@ uniffi = ["dep:lumina-node-uniffi"]
1616

1717
[dependencies]
1818
blockstore = { workspace = true }
19+
thiserror = { workspace = true }
1920
async-trait = { workspace = true }
2021
serde = { workspace = true }
22+
serde_json = { workspace = true }
2123
tracing = { workspace = true }
2224
anyhow = { workspace = true }
25+
web-time = { workspace = true, features = ["serde"] }
2326
redb = { workspace = true }
2427
lumina-node = { workspace = true }
2528
prism-serde = { workspace = true }
2629
ed25519-consensus = { workspace = true }
30+
sp1-verifier = { workspace = true }
2731
celestia-types = { workspace = true }
2832
prism-errors = { workspace = true }
2933
prism-keys = { workspace = true }
3034
prism-common = { workspace = true }
3135
libp2p = { workspace = true, features = ["serde"] }
36+
mockall = { workspace = true }
3237

3338
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
3439
celestia-rpc = { workspace = true }

crates/da/src/celestia/full_node.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
#![cfg(not(target_arch = "wasm32"))]
22

3-
use crate::{FinalizedEpoch, LightDataAvailabilityLayer};
3+
use crate::{
4+
FinalizedEpoch, LightDataAvailabilityLayer, VerifiableEpoch,
5+
events::{EventChannel, PrismEvent},
6+
};
47
use anyhow::{Context, Result, anyhow};
58
use async_trait::async_trait;
69
use celestia_types::{Blob, nmt::Namespace};
7-
use lumina_node::events::EventSubscriber;
810
use prism_errors::{DataAvailabilityError, GeneralError};
911
use std::{
1012
self,
@@ -15,7 +17,7 @@ use std::{
1517
};
1618
use tracing::{error, trace};
1719

18-
use tokio::sync::{Mutex, broadcast};
20+
use tokio::sync::broadcast;
1921

2022
use crate::DataAvailabilityLayer;
2123
use celestia_rpc::{BlobClient, Client, HeaderClient, TxConfig};
@@ -34,6 +36,7 @@ pub struct CelestiaConnection {
3436

3537
height_update_tx: broadcast::Sender<u64>,
3638
sync_target: Arc<AtomicU64>,
39+
event_channel: Arc<EventChannel>,
3740
}
3841

3942
impl CelestiaConnection {
@@ -55,30 +58,32 @@ impl CelestiaConnection {
5558
))?;
5659

5760
let (height_update_tx, _) = broadcast::channel(100);
61+
let event_channel = Arc::new(EventChannel::new());
5862

5963
Ok(CelestiaConnection {
6064
client,
6165
snark_namespace,
6266
operation_namespace,
6367
height_update_tx,
6468
sync_target: Arc::new(AtomicU64::new(0)),
69+
event_channel,
6570
})
6671
}
6772
}
6873

6974
#[async_trait]
7075
impl LightDataAvailabilityLayer for CelestiaConnection {
71-
async fn get_finalized_epoch(&self, height: u64) -> Result<Vec<FinalizedEpoch>> {
76+
async fn get_finalized_epoch(&self, height: u64) -> Result<Vec<VerifiableEpoch>> {
7277
trace!("searching for epoch on da layer at height {}", height);
7378

7479
match BlobClient::blob_get_all(&self.client, height, &[self.snark_namespace]).await {
7580
Ok(maybe_blobs) => match maybe_blobs {
7681
Some(blobs) => {
77-
let valid_epoch: Vec<FinalizedEpoch> = blobs
82+
let valid_epochs: Vec<VerifiableEpoch> = blobs
7883
.into_iter()
7984
.filter_map(|blob| {
8085
match FinalizedEpoch::try_from(&blob) {
81-
Ok(epoch) => Some(epoch),
86+
Ok(epoch) => Some(Box::new(epoch) as VerifiableEpoch),
8287
Err(e) => {
8388
warn!(
8489
"Ignoring blob: marshalling blob from height {} to epoch json failed with error {}: {:?}",
@@ -89,7 +94,7 @@ impl LightDataAvailabilityLayer for CelestiaConnection {
8994
}
9095
})
9196
.collect();
92-
Ok(valid_epoch)
97+
Ok(valid_epochs)
9398
}
9499
None => Ok(vec![]),
95100
},
@@ -106,8 +111,8 @@ impl LightDataAvailabilityLayer for CelestiaConnection {
106111
}
107112
}
108113

109-
fn event_subscriber(&self) -> Option<Arc<Mutex<EventSubscriber>>> {
110-
None
114+
fn event_channel(&self) -> Arc<EventChannel> {
115+
self.event_channel.clone()
111116
}
112117
}
113118

@@ -120,6 +125,7 @@ impl DataAvailabilityLayer for CelestiaConnection {
120125

121126
let sync_target = self.sync_target.clone();
122127
let height_update_tx = self.height_update_tx.clone();
128+
let event_publisher = self.event_channel.publisher();
123129

124130
spawn(async move {
125131
while let Some(extended_header_result) = header_sub.next().await {
@@ -130,6 +136,8 @@ impl DataAvailabilityLayer for CelestiaConnection {
130136
// todo: correct error handling
131137
let _ = height_update_tx.send(height);
132138
trace!("updated sync target for height {}", height);
139+
140+
event_publisher.send(PrismEvent::UpdateDAHeight { height });
133141
}
134142
Err(e) => {
135143
error!("Error retrieving header from DA layer: {}", e);

crates/da/src/celestia/light_client.rs

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,16 @@
11
use super::utils::{NetworkConfig, create_namespace};
2-
use crate::{FinalizedEpoch, LightDataAvailabilityLayer};
2+
use crate::{
3+
FinalizedEpoch, LightDataAvailabilityLayer, VerifiableEpoch,
4+
events::{EventChannel, EventPublisher},
5+
};
36
use anyhow::{Result, anyhow};
47
use async_trait::async_trait;
58
use celestia_types::nmt::Namespace;
6-
use lumina_node::{Node, NodeError, events::EventSubscriber, store::StoreError};
9+
use lumina_node::{
10+
Node, NodeError,
11+
blockstore::InMemoryBlockstore,
12+
store::{EitherStore, InMemoryStore, StoreError},
13+
};
714
use prism_errors::DataAvailabilityError;
815
use std::{self, sync::Arc};
916
use tokio::sync::{Mutex, RwLock};
@@ -17,8 +24,6 @@ use lumina_node::NodeBuilder;
1724
#[cfg(not(target_arch = "wasm32"))]
1825
use {
1926
blockstore::EitherBlockstore,
20-
lumina_node::blockstore::InMemoryBlockstore,
21-
lumina_node::store::{EitherStore, InMemoryStore},
2227
redb::Database,
2328
tokio::task::spawn_blocking,
2429
};
@@ -40,7 +45,7 @@ pub type LuminaNode = Node<
4045

4146
pub struct LightClientConnection {
4247
pub node: Arc<RwLock<LuminaNode>>,
43-
pub event_subscriber: Arc<Mutex<EventSubscriber>>,
48+
pub event_channel: Arc<EventChannel>,
4449
pub snark_namespace: Namespace,
4550
}
4651

@@ -98,11 +103,16 @@ impl LightClientConnection {
98103
.start_subscribed()
99104
.await?;
100105

106+
let lumina_sub = Arc::new(Mutex::new(event_subscriber));
107+
108+
// Creates an EventChannel that starts forwarding lumina events to the subscriber
109+
let prism_chan = EventChannel::from(lumina_sub.clone());
110+
101111
let snark_namespace = create_namespace(&celestia_config.snark_namespace_id)?;
102112

103113
Ok(LightClientConnection {
104114
node: Arc::new(RwLock::new(node)),
105-
event_subscriber: Arc::new(Mutex::new(event_subscriber)),
115+
event_channel: Arc::new(prism_chan),
106116
snark_namespace,
107117
})
108118
}
@@ -129,25 +139,33 @@ impl LightClientConnection {
129139
.await?;
130140
let (node, event_subscriber) = node_builder.start_subscribed().await?;
131141

142+
let lumina_sub = Arc::new(Mutex::new(event_subscriber));
143+
144+
// Creates an EventChannel that starts forwarding lumina events to the subscriber
145+
let prism_chan = EventChannel::from(lumina_sub.clone());
146+
132147
let snark_namespace = create_namespace(&celestia_config.snark_namespace_id)?;
133148

134149
Ok(LightClientConnection {
135150
node: Arc::new(RwLock::new(node)),
136-
event_subscriber: Arc::new(Mutex::new(event_subscriber)),
151+
event_channel: Arc::new(prism_chan),
137152
snark_namespace,
138153
})
139154
}
155+
156+
pub fn event_publisher(&self) -> EventPublisher {
157+
self.event_channel.publisher()
158+
}
140159
}
141160

142161
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
143162
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
144163
impl LightDataAvailabilityLayer for LightClientConnection {
145-
// since the lumina node is already started in the constructor, we don't need to start it again. We need the event_subscriber to start forwarding events.
146-
fn event_subscriber(&self) -> Option<Arc<Mutex<EventSubscriber>>> {
147-
Some(self.event_subscriber.clone())
164+
fn event_channel(&self) -> Arc<EventChannel> {
165+
self.event_channel.clone()
148166
}
149167

150-
async fn get_finalized_epoch(&self, height: u64) -> Result<Vec<FinalizedEpoch>> {
168+
async fn get_finalized_epoch(&self, height: u64) -> Result<Vec<VerifiableEpoch>> {
151169
trace!(
152170
"searching for epoch on da layer at height {} under namespace",
153171
height
@@ -167,10 +185,10 @@ impl LightDataAvailabilityLayer for LightClientConnection {
167185

168186
match node.request_all_blobs(&header, self.snark_namespace, None).await {
169187
Ok(blobs) => {
170-
let epochs: Vec<FinalizedEpoch> = blobs
188+
let epochs: Vec<VerifiableEpoch> = blobs
171189
.into_iter()
172190
.filter_map(|blob| match FinalizedEpoch::try_from(&blob) {
173-
Ok(epoch) => Some(epoch),
191+
Ok(epoch) => Some(Box::new(epoch) as VerifiableEpoch),
174192
Err(_) => {
175193
warn!(
176194
"marshalling blob from height {} to epoch json: {:?}",

crates/da/src/celestia/utils.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
use std::{str::FromStr, time::Duration, fmt};
1+
use std::{fmt, str::FromStr, time::Duration};
22

33
use anyhow::{Context, Result};
44
use celestia_types::nmt::Namespace;
55
use lumina_node::{
66
network::Network as CelestiaNetwork,
77
node::{DEFAULT_PRUNING_DELAY, DEFAULT_SAMPLING_WINDOW},
88
};
9-
use prism_keys::VerifyingKey;
9+
use prism_keys::{SigningKey, VerifyingKey};
1010
use prism_serde::{self, base64::FromBase64, hex::FromHex};
1111
use serde::{Deserialize, Serialize};
1212

@@ -53,7 +53,7 @@ pub struct NetworkConfig {
5353
pub network: Network,
5454
pub celestia_network: CelestiaNetwork,
5555
/// The verifying key of the prover
56-
pub verifying_key: Option<VerifyingKey>,
56+
pub verifying_key: VerifyingKey,
5757
pub celestia_config: Option<CelestiaConfig>,
5858
}
5959

@@ -62,7 +62,8 @@ impl Default for NetworkConfig {
6262
NetworkConfig {
6363
network: Network::Custom("custom".to_string()),
6464
celestia_network: CelestiaNetwork::custom("private").unwrap(),
65-
verifying_key: None,
65+
// TODO: This is just a placeholder, don't let this get merged
66+
verifying_key: SigningKey::new_ed25519().verifying_key(),
6667
celestia_config: None,
6768
}
6869
}
@@ -85,10 +86,10 @@ impl Network {
8586
Network::Specter => NetworkConfig {
8687
network: Network::Specter,
8788
celestia_network: CelestiaNetwork::Mocha,
88-
verifying_key: Some(
89-
VerifyingKey::from_base64("L2ilppK59Kq3aAMB/wpxdVGaI53DHPMdY6fcRodyFaA=")
90-
.unwrap(),
91-
),
89+
verifying_key: VerifyingKey::from_base64(
90+
"L2ilppK59Kq3aAMB/wpxdVGaI53DHPMdY6fcRodyFaA=",
91+
)
92+
.unwrap(),
9293
celestia_config: Some(CelestiaConfig {
9394
start_height: 5725333,
9495
snark_namespace_id: "000000000000000000000000000000000000707269736d5350457331"

0 commit comments

Comments
 (0)