Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 2 additions & 8 deletions crates/cli/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use prism_storage::{
use prism_telemetry::config::{TelemetryConfig, get_default_telemetry_config};
use serde::{Deserialize, Serialize};
use std::{fs, path::Path, str::FromStr, sync::Arc, time::Duration};
use tracing::{error, info, warn};
use tracing::{error, info};

use prism_da::{
DataAvailabilityLayer, LightDataAvailabilityLayer,
Expand Down Expand Up @@ -201,12 +201,6 @@ pub fn load_config(args: CommandArgs) -> Result<Config> {

info!("Final config: {:?}", final_config);

if final_config.network.verifying_key.is_none() {
warn!(
"prover's verifying key was not provided. this is not recommended and epoch signatures will not be verified."
);
}

Ok(final_config)
}

Expand Down Expand Up @@ -301,7 +295,7 @@ fn apply_command_line_args(config: Config, args: CommandArgs) -> Config {
verifying_key: args
.verifying_key
.and_then(|x| VerifyingKey::from_base64(x).ok())
.or(network_config.clone().verifying_key),
.unwrap_or(network_config.verifying_key.clone()),
celestia_config,
},
keystore_type: args.keystore_type.or(config.keystore_type),
Expand Down
15 changes: 3 additions & 12 deletions crates/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use prism_telemetry_registry::{init::init, metrics_registry::get_metrics};
use std::io::{Error, ErrorKind};

use node_types::NodeType;
use prism_lightclient::{LightClient, events::EventChannel};
use prism_lightclient::LightClient;
use prism_prover::Prover;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -90,13 +90,7 @@ async fn main() -> std::io::Result<()> {
Error::other(e.to_string())
})?;

let event_channel = EventChannel::new();

Arc::new(LightClient::new(
da,
verifying_key,
event_channel.publisher(),
))
Arc::new(LightClient::new(da, verifying_key))
}
Commands::Prover(_) => {
let db = initialize_db(&config).map_err(|e| Error::other(e.to_string()))?;
Expand Down Expand Up @@ -155,10 +149,7 @@ async fn main() -> std::io::Result<()> {

let signing_key = get_signing_key(keystore_type, keystore_path)?;

let verifying_key =
config.network.verifying_key.clone().ok_or_else(|| {
Error::new(ErrorKind::NotFound, "prover verifying key not found")
})?;
let verifying_key = config.network.verifying_key.clone();

// When SP1_PROVER is set to mock, disable recursive proofs
let recursive_proofs = std::env::var("SP1_PROVER").map_or(true, |val| val != "mock");
Expand Down
5 changes: 5 additions & 0 deletions crates/da/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,24 @@ uniffi = ["dep:lumina-node-uniffi"]

[dependencies]
blockstore = { workspace = true }
thiserror = { workspace = true }
async-trait = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tracing = { workspace = true }
anyhow = { workspace = true }
web-time = { workspace = true, features = ["serde"] }
redb = { workspace = true }
lumina-node = { workspace = true }
prism-serde = { workspace = true }
ed25519-consensus = { workspace = true }
sp1-verifier = { workspace = true }
celestia-types = { workspace = true }
prism-errors = { workspace = true }
prism-keys = { workspace = true }
prism-common = { workspace = true }
libp2p = { workspace = true, features = ["serde"] }
mockall = { workspace = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
celestia-rpc = { workspace = true }
Expand Down
26 changes: 17 additions & 9 deletions crates/da/src/celestia/full_node.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#![cfg(not(target_arch = "wasm32"))]

use crate::{FinalizedEpoch, LightDataAvailabilityLayer};
use crate::{
FinalizedEpoch, LightDataAvailabilityLayer, VerifiableEpoch,
events::{EventChannel, PrismEvent},
};
use anyhow::{Context, Result, anyhow};
use async_trait::async_trait;
use celestia_types::{Blob, nmt::Namespace};
use lumina_node::events::EventSubscriber;
use prism_errors::{DataAvailabilityError, GeneralError};
use std::{
self,
Expand All @@ -15,7 +17,7 @@ use std::{
};
use tracing::{error, trace};

use tokio::sync::{Mutex, broadcast};
use tokio::sync::broadcast;

use crate::DataAvailabilityLayer;
use celestia_rpc::{BlobClient, Client, HeaderClient, TxConfig};
Expand All @@ -34,6 +36,7 @@ pub struct CelestiaConnection {

height_update_tx: broadcast::Sender<u64>,
sync_target: Arc<AtomicU64>,
event_channel: Arc<EventChannel>,
}

impl CelestiaConnection {
Expand All @@ -55,30 +58,32 @@ impl CelestiaConnection {
))?;

let (height_update_tx, _) = broadcast::channel(100);
let event_channel = Arc::new(EventChannel::new());

Ok(CelestiaConnection {
client,
snark_namespace,
operation_namespace,
height_update_tx,
sync_target: Arc::new(AtomicU64::new(0)),
event_channel,
})
}
}

#[async_trait]
impl LightDataAvailabilityLayer for CelestiaConnection {
async fn get_finalized_epoch(&self, height: u64) -> Result<Vec<FinalizedEpoch>> {
async fn get_finalized_epoch(&self, height: u64) -> Result<Vec<VerifiableEpoch>> {
trace!("searching for epoch on da layer at height {}", height);

match BlobClient::blob_get_all(&self.client, height, &[self.snark_namespace]).await {
Ok(maybe_blobs) => match maybe_blobs {
Some(blobs) => {
let valid_epoch: Vec<FinalizedEpoch> = blobs
let valid_epochs: Vec<VerifiableEpoch> = blobs
.into_iter()
.filter_map(|blob| {
match FinalizedEpoch::try_from(&blob) {
Ok(epoch) => Some(epoch),
Ok(epoch) => Some(Box::new(epoch) as VerifiableEpoch),
Err(e) => {
warn!(
"Ignoring blob: marshalling blob from height {} to epoch json failed with error {}: {:?}",
Expand All @@ -89,7 +94,7 @@ impl LightDataAvailabilityLayer for CelestiaConnection {
}
})
.collect();
Ok(valid_epoch)
Ok(valid_epochs)
}
None => Ok(vec![]),
},
Expand All @@ -106,8 +111,8 @@ impl LightDataAvailabilityLayer for CelestiaConnection {
}
}

fn event_subscriber(&self) -> Option<Arc<Mutex<EventSubscriber>>> {
None
fn event_channel(&self) -> Arc<EventChannel> {
self.event_channel.clone()
}
}

Expand All @@ -120,6 +125,7 @@ impl DataAvailabilityLayer for CelestiaConnection {

let sync_target = self.sync_target.clone();
let height_update_tx = self.height_update_tx.clone();
let event_publisher = self.event_channel.publisher();

spawn(async move {
while let Some(extended_header_result) = header_sub.next().await {
Expand All @@ -130,6 +136,8 @@ impl DataAvailabilityLayer for CelestiaConnection {
// todo: correct error handling
let _ = height_update_tx.send(height);
trace!("updated sync target for height {}", height);

event_publisher.send(PrismEvent::UpdateDAHeight { height });
}
Err(e) => {
error!("Error retrieving header from DA layer: {}", e);
Expand Down
44 changes: 31 additions & 13 deletions crates/da/src/celestia/light_client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
use super::utils::{NetworkConfig, create_namespace};
use crate::{FinalizedEpoch, LightDataAvailabilityLayer};
use crate::{
FinalizedEpoch, LightDataAvailabilityLayer, VerifiableEpoch,
events::{EventChannel, EventPublisher},
};
use anyhow::{Result, anyhow};
use async_trait::async_trait;
use celestia_types::nmt::Namespace;
use lumina_node::{Node, NodeError, events::EventSubscriber, store::StoreError};
use lumina_node::{
Node, NodeError,
blockstore::InMemoryBlockstore,
store::{EitherStore, InMemoryStore, StoreError},
};
use prism_errors::DataAvailabilityError;
use std::{self, sync::Arc};
use tokio::sync::{Mutex, RwLock};
Expand All @@ -17,8 +24,6 @@ use lumina_node::NodeBuilder;
#[cfg(not(target_arch = "wasm32"))]
use {
blockstore::EitherBlockstore,
lumina_node::blockstore::InMemoryBlockstore,
lumina_node::store::{EitherStore, InMemoryStore},
redb::Database,
tokio::task::spawn_blocking,
};
Expand All @@ -40,7 +45,7 @@ pub type LuminaNode = Node<

pub struct LightClientConnection {
pub node: Arc<RwLock<LuminaNode>>,
pub event_subscriber: Arc<Mutex<EventSubscriber>>,
pub event_channel: Arc<EventChannel>,
pub snark_namespace: Namespace,
}

Expand Down Expand Up @@ -98,11 +103,16 @@ impl LightClientConnection {
.start_subscribed()
.await?;

let lumina_sub = Arc::new(Mutex::new(event_subscriber));

// Creates an EventChannel that starts forwarding lumina events to the subscriber
let prism_chan = EventChannel::from(lumina_sub.clone());

let snark_namespace = create_namespace(&celestia_config.snark_namespace_id)?;

Ok(LightClientConnection {
node: Arc::new(RwLock::new(node)),
event_subscriber: Arc::new(Mutex::new(event_subscriber)),
event_channel: Arc::new(prism_chan),
snark_namespace,
})
}
Expand All @@ -129,25 +139,33 @@ impl LightClientConnection {
.await?;
let (node, event_subscriber) = node_builder.start_subscribed().await?;

let lumina_sub = Arc::new(Mutex::new(event_subscriber));

// Creates an EventChannel that starts forwarding lumina events to the subscriber
let prism_chan = EventChannel::from(lumina_sub.clone());

let snark_namespace = create_namespace(&celestia_config.snark_namespace_id)?;

Ok(LightClientConnection {
node: Arc::new(RwLock::new(node)),
event_subscriber: Arc::new(Mutex::new(event_subscriber)),
event_channel: Arc::new(prism_chan),
snark_namespace,
})
}

pub fn event_publisher(&self) -> EventPublisher {
self.event_channel.publisher()
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl LightDataAvailabilityLayer for LightClientConnection {
// since the lumina node is already started in the constructor, we don't need to start it again. We need the event_subscriber to start forwarding events.
fn event_subscriber(&self) -> Option<Arc<Mutex<EventSubscriber>>> {
Some(self.event_subscriber.clone())
fn event_channel(&self) -> Arc<EventChannel> {
self.event_channel.clone()
}

async fn get_finalized_epoch(&self, height: u64) -> Result<Vec<FinalizedEpoch>> {
async fn get_finalized_epoch(&self, height: u64) -> Result<Vec<VerifiableEpoch>> {
trace!(
"searching for epoch on da layer at height {} under namespace",
height
Expand All @@ -167,10 +185,10 @@ impl LightDataAvailabilityLayer for LightClientConnection {

match node.request_all_blobs(&header, self.snark_namespace, None).await {
Ok(blobs) => {
let epochs: Vec<FinalizedEpoch> = blobs
let epochs: Vec<VerifiableEpoch> = blobs
.into_iter()
.filter_map(|blob| match FinalizedEpoch::try_from(&blob) {
Ok(epoch) => Some(epoch),
Ok(epoch) => Some(Box::new(epoch) as VerifiableEpoch),
Err(_) => {
warn!(
"marshalling blob from height {} to epoch json: {:?}",
Expand Down
17 changes: 9 additions & 8 deletions crates/da/src/celestia/utils.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::{str::FromStr, time::Duration, fmt};
use std::{fmt, str::FromStr, time::Duration};

use anyhow::{Context, Result};
use celestia_types::nmt::Namespace;
use lumina_node::{
network::Network as CelestiaNetwork,
node::{DEFAULT_PRUNING_DELAY, DEFAULT_SAMPLING_WINDOW},
};
use prism_keys::VerifyingKey;
use prism_keys::{SigningKey, VerifyingKey};
use prism_serde::{self, base64::FromBase64, hex::FromHex};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -53,7 +53,7 @@ pub struct NetworkConfig {
pub network: Network,
pub celestia_network: CelestiaNetwork,
/// The verifying key of the prover
pub verifying_key: Option<VerifyingKey>,
pub verifying_key: VerifyingKey,
pub celestia_config: Option<CelestiaConfig>,
}

Expand All @@ -62,7 +62,8 @@ impl Default for NetworkConfig {
NetworkConfig {
network: Network::Custom("custom".to_string()),
celestia_network: CelestiaNetwork::custom("private").unwrap(),
verifying_key: None,
// TODO: This is just a placeholder, don't let this get merged
verifying_key: SigningKey::new_ed25519().verifying_key(),
celestia_config: None,
}
}
Expand All @@ -85,10 +86,10 @@ impl Network {
Network::Specter => NetworkConfig {
network: Network::Specter,
celestia_network: CelestiaNetwork::Mocha,
verifying_key: Some(
VerifyingKey::from_base64("L2ilppK59Kq3aAMB/wpxdVGaI53DHPMdY6fcRodyFaA=")
.unwrap(),
),
verifying_key: VerifyingKey::from_base64(
"L2ilppK59Kq3aAMB/wpxdVGaI53DHPMdY6fcRodyFaA=",
)
.unwrap(),
celestia_config: Some(CelestiaConfig {
start_height: 5725333,
snark_namespace_id: "000000000000000000000000000000000000707269736d5350457331"
Expand Down
Loading
Loading