Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 56 additions & 81 deletions crates/prism/src/da/celestia.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ use celestia_rpc::{BlobClient, Client, HeaderClient};
use celestia_types::{blob::GasPrice, nmt::Namespace, Blob};
use prism_common::operation::Operation;
use prism_errors::{DataAvailabilityError, GeneralError};
use std::{self, sync::Arc};
use tokio::{
use std::{
self,
sync::{
mpsc::{channel, Receiver, Sender},
Mutex,
atomic::{AtomicU64, Ordering},
Arc,
},
task::spawn,
};
use tokio::{sync::broadcast, task::spawn};

use bincode;

Expand All @@ -35,14 +35,12 @@ pub struct CelestiaConnection {
pub snark_namespace: Namespace,
pub operation_namespace: Namespace,

sync_target_tx: Arc<Sender<u64>>,
sync_target_rx: Arc<Mutex<Receiver<u64>>>,
height_update_tx: broadcast::Sender<u64>,
sync_target: Arc<AtomicU64>,
}

impl CelestiaConnection {
pub async fn new(config: &CelestiaConfig, auth_token: Option<&str>) -> Result<Self> {
let (tx, rx) = channel(CHANNEL_BUFFER_SIZE);

let client = Client::new(&config.connection_string, auth_token)
.await
.context("Failed to initialize websocket connection")
Expand All @@ -61,12 +59,14 @@ impl CelestiaConnection {
None => snark_namespace,
};

let (height_update_tx, _) = broadcast::channel(100);

Ok(CelestiaConnection {
client,
snark_namespace,
operation_namespace,
sync_target_tx: Arc::new(tx),
sync_target_rx: Arc::new(Mutex::new(rx)),
height_update_tx,
sync_target: Arc::new(AtomicU64::new(0)),
})
}
}
Expand All @@ -86,41 +86,38 @@ fn create_namespace(namespace_hex: &str) -> Result<Namespace> {
#[async_trait]
impl DataAvailabilityLayer for CelestiaConnection {
async fn get_latest_height(&self) -> Result<u64> {
match self.sync_target_rx.lock().await.recv().await {
Some(height) => Ok(height),
None => Err(anyhow!(DataAvailabilityError::ChannelReceiveError)),
}
Ok(self.sync_target.load(Ordering::Relaxed))
}

async fn initialize_sync_target(&self) -> Result<u64> {
HeaderClient::header_network_head(&self.client)
let height = HeaderClient::header_network_head(&self.client)
.await
.context("Failed to get network head from DA layer")
.map(|extended_header| extended_header.header.height.value())
.map(|extended_header| extended_header.header.height.value())?;

self.sync_target.store(height, Ordering::Relaxed);
Ok(height)
}

async fn get_snarks(&self, height: u64) -> Result<Vec<FinalizedEpoch>> {
async fn get_snark(&self, height: u64) -> Result<Option<FinalizedEpoch>> {
trace!("searching for epoch on da layer at height {}", height);

match BlobClient::blob_get_all(&self.client, height, &[self.snark_namespace]).await {
Ok(blobs) => {
let mut epochs = Vec::new();
for blob in blobs.iter() {
match FinalizedEpoch::try_from(blob) {
Ok(epoch_json) => epochs.push(epoch_json),
Err(_) => {
GeneralError::ParsingError(format!(
"marshalling blob from height {} to epoch json: {:?}",
height, &blob
));
}
}
}
Ok(epochs)
}
Ok(blobs) => blobs
.into_iter()
.next()
.map(|blob| {
FinalizedEpoch::try_from(&blob).map_err(|_| {
anyhow!(GeneralError::ParsingError(format!(
"marshalling blob from height {} to epoch json: {:?}",
height, &blob
)))
})
})
.transpose(),
Err(err) => {
// todo: this is a hack to handle a retarded error from cel-node that will be fixed in v0.15.0
if err.to_string().contains("blob: not found") {
Ok(vec![])
Ok(None)
} else {
Err(anyhow!(DataAvailabilityError::DataRetrievalError(
height,
Expand All @@ -131,38 +128,22 @@ impl DataAvailabilityLayer for CelestiaConnection {
}
}

async fn submit_snarks(&self, epochs: Vec<FinalizedEpoch>) -> Result<u64> {
if epochs.is_empty() {
bail!("no epochs provided for submission");
}
async fn submit_snark(&self, epoch: FinalizedEpoch) -> Result<u64> {
debug!("posting {}th epoch to da layer", epoch.height);

debug!("posting {} epochs to da layer", epochs.len());
let data = bincode::serialize(&epoch).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::ParsingError(format!(
"serializing epoch {}: {}",
epoch.height, e
)))
})?;

let blobs: Result<Vec<Blob>, DataAvailabilityError> = epochs
.iter()
.map(|epoch| {
let data = bincode::serialize(epoch).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::ParsingError(format!(
"serializing epoch {}: {}",
epoch.height, e
)))
})?;
Blob::new(self.snark_namespace, data).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::BlobCreationError(
e.to_string(),
))
})
})
.collect();

let blobs = blobs?;

for (i, blob) in blobs.iter().enumerate() {
trace!("blob {}: {:?}", i, blob);
}
let blob = Blob::new(self.snark_namespace, data).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::BlobCreationError(e.to_string()))
})?;

self.client
.blob_submit(&blobs, GasPrice::from(-1.0))
.blob_submit(&[blob], GasPrice::from(-1.0))
.await
.map_err(|e| anyhow!(DataAvailabilityError::SubmissionError(e.to_string())))
}
Expand Down Expand Up @@ -230,35 +211,29 @@ impl DataAvailabilityLayer for CelestiaConnection {
.map_err(|e| anyhow!(DataAvailabilityError::SubmissionError(e.to_string())))
}

fn subscribe_to_heights(&self) -> broadcast::Receiver<u64> {
self.height_update_tx.subscribe()
}

async fn start(&self) -> Result<()> {
let mut header_sub = HeaderClient::header_subscribe(&self.client)
.await
.context("Failed to subscribe to headers from DA layer")
.map_err(|e| DataAvailabilityError::NetworkError(e.to_string()))?;
.context("Failed to subscribe to headers from DA layer")?;

let sync_target = self.sync_target.clone();
let height_update_tx = self.height_update_tx.clone();

let synctarget_buffer = self.sync_target_tx.clone();
spawn(async move {
while let Some(extended_header_result) = header_sub.next().await {
match extended_header_result {
Ok(extended_header) => {
let height = extended_header.header.height.value();
match synctarget_buffer.send(height).await {
Ok(_) => {
debug!("sent sync target update for height {}", height);
}
Err(_) => {
DataAvailabilityError::SyncTargetError(format!(
"sending sync target update message for height {}",
height
));
}
}
sync_target.store(height, Ordering::Relaxed);
let _ = height_update_tx.send(height);
debug!("updated sync target for height {}", height);
}
Err(e) => {
DataAvailabilityError::NetworkError(format!(
"retrieving header from da layer: {}",
e
));
error!("Error retrieving header from DA layer: {}", e);
}
}
}
Expand Down
25 changes: 14 additions & 11 deletions crates/prism/src/da/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::da::{DataAvailabilityLayer, FinalizedEpoch};
use anyhow::Result;
use async_trait::async_trait;
use prism_common::operation::Operation;
use std::sync::Arc;
use std::{collections::VecDeque, sync::Arc};
use tokio::{
sync::{broadcast, RwLock},
time::{interval, Duration},
Expand All @@ -12,14 +12,14 @@ use tokio::{
pub struct Block {
pub height: u64,
pub operations: Vec<Operation>,
pub epochs: Vec<FinalizedEpoch>,
pub epoch: Option<FinalizedEpoch>,
}

#[derive(Clone)]
pub struct InMemoryDataAvailabilityLayer {
blocks: Arc<RwLock<Vec<Block>>>,
pending_operations: Arc<RwLock<Vec<Operation>>>,
pending_epochs: Arc<RwLock<Vec<FinalizedEpoch>>>,
pending_epochs: Arc<RwLock<VecDeque<FinalizedEpoch>>>,
latest_height: Arc<RwLock<u64>>,
height_update_tx: broadcast::Sender<u64>,
block_update_tx: broadcast::Sender<Block>,
Expand All @@ -34,7 +34,7 @@ impl InMemoryDataAvailabilityLayer {
Self {
blocks: Arc::new(RwLock::new(Vec::new())),
pending_operations: Arc::new(RwLock::new(Vec::new())),
pending_epochs: Arc::new(RwLock::new(Vec::new())),
pending_epochs: Arc::new(RwLock::new(VecDeque::new())),
latest_height: Arc::new(RwLock::new(0)),
height_update_tx: height_tx,
block_update_tx: block_tx,
Expand All @@ -58,13 +58,12 @@ impl InMemoryDataAvailabilityLayer {
let new_block = Block {
height: *latest_height,
operations: std::mem::take(&mut *pending_operations),
epochs: std::mem::take(&mut *pending_epochs),
epoch: pending_epochs.pop_front(),
};
debug!(
"new block produced at height {} with {} operations and {} snarks",
"new block produced at height {} with {} operations",
new_block.height,
new_block.operations.len(),
new_block.epochs.len()
);
blocks.push(new_block.clone());

Expand All @@ -81,6 +80,10 @@ impl InMemoryDataAvailabilityLayer {

#[async_trait]
impl DataAvailabilityLayer for InMemoryDataAvailabilityLayer {
fn subscribe_to_heights(&self) -> broadcast::Receiver<u64> {
self.height_update_tx.subscribe()
}

async fn get_latest_height(&self) -> Result<u64> {
Ok(*self.latest_height.read().await)
}
Expand All @@ -89,18 +92,18 @@ impl DataAvailabilityLayer for InMemoryDataAvailabilityLayer {
self.get_latest_height().await
}

async fn get_snarks(&self, height: u64) -> Result<Vec<FinalizedEpoch>> {
async fn get_snark(&self, height: u64) -> Result<Option<FinalizedEpoch>> {
let blocks = self.blocks.read().await;
Ok(blocks
.iter()
.find(|block| block.height == height)
.map(|block| block.epochs.clone())
.map(|block| block.epoch.clone())
.unwrap_or_default())
}

async fn submit_snarks(&self, epochs: Vec<FinalizedEpoch>) -> Result<u64> {
async fn submit_snark(&self, epoch: FinalizedEpoch) -> Result<u64> {
let mut pending_epochs = self.pending_epochs.write().await;
pending_epochs.extend(epochs);
pending_epochs.push_back(epoch);
self.get_latest_height().await
}

Expand Down
6 changes: 4 additions & 2 deletions crates/prism/src/da/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use prism_errors::GeneralError;
use serde::{Deserialize, Serialize};
use sp1_sdk::SP1ProofWithPublicValues;
use std::{self, str::FromStr};
use tokio::sync::broadcast;

pub mod celestia;
pub mod memory;
Expand Down Expand Up @@ -50,9 +51,10 @@ impl SignedContent for FinalizedEpoch {
pub trait DataAvailabilityLayer: Send + Sync {
async fn get_latest_height(&self) -> Result<u64>;
async fn initialize_sync_target(&self) -> Result<u64>;
async fn get_snarks(&self, height: u64) -> Result<Vec<FinalizedEpoch>>;
async fn submit_snarks(&self, epoch: Vec<FinalizedEpoch>) -> Result<u64>;
async fn get_snark(&self, height: u64) -> Result<Option<FinalizedEpoch>>;
async fn submit_snark(&self, epoch: FinalizedEpoch) -> Result<u64>;
async fn get_operations(&self, height: u64) -> Result<Vec<Operation>>;
async fn submit_operations(&self, operations: Vec<Operation>) -> Result<u64>;
async fn start(&self) -> Result<()>;
fn subscribe_to_heights(&self) -> broadcast::Receiver<u64>;
}
Loading
Loading