Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
501 changes: 337 additions & 164 deletions da-indexer/Cargo.lock

Large diffs are not rendered by default.

21 changes: 20 additions & 1 deletion da-indexer/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
DA Indexer Service
===

The DA Indexer service collects blobs from different DA solutions (currently only Celestia and EigenDA) and provides a convenient API for fetching blob data.
The DA Indexer service collects blobs from different DA solutions (currently only Celestia and EigenDA) and provides a convenient API for fetching blob data. In addition to indexing blobs, this service can be configured to fetch L2 batch metadata corresponding to a specific blob (currently only available for Celestia).

## Celestia
The Celestia indexer runs on top of the [Celestia light node](https://docs.celestia.org/nodes/light-node). It is worth noting that the indexer collects only blobs and some block metadata, it does not collect full blocks, transactions, etc.
Expand All @@ -23,6 +23,7 @@ The EigenDA indexer runs on top of the EigenDA disperser. It is worth mentioning
| DA_INDEXER__INDEXER__RETRY_INTERVAL | The delay between attempts to reprocess failed jobs | 180 seconds |
| DA_INDEXER__INDEXER__CATCHUP_INTERVAL | The delay between attempts to process missing jobs | 0 seconds |
| DA_INDEXER__DA__TYPE | "Celestia" or "EigenDA" | |
| DA_INDEXER__L2_ROUTER__ROUTES_PATH | Path to the routes config file | |


### Celestia
Expand All @@ -44,6 +45,24 @@ The EigenDA indexer runs on top of the EigenDA disperser. It is worth mentioning
| DA_INDEXER__INDEXER__DA__SAVE_BATCH_SIZE | The number of blobs to save per db transaction | |
| DA_INDEXER__INDEXER__DA__PRUNING_BLOCK_THRESHOLD | The threshold above which blobs might be unavailable | |

### L2 Batch Metadata
To fetch L2 batch metadata, the service must be aware of the L2s that use Celestia as a DA layer and the namespaces they utilize. This information is configured in a separate file, with its path specified in the `DA_INDEXER__L2_ROUTER__ROUTES_PATH` environment variable. Indexer and database configuration are optional if the `DA_INDEXER__L2_ROUTER__ROUTES_PATH` environment variable is set. An example of the routes config is shown below:
```toml
[routes.0x00000000000000000000000000000000000000000008e5f679bf7116cb]
l2_chain_type = "Optimism"
l2_chain_id = 123420111
l2_api_url = "https://opcelestia-raspberry.gelatoscout.com/"
l2_blockscout_url = "https://opcelestia-raspberry.gelatoscout.com/"

[routes.0x00000000000000000000000000000000000000ca1de12a1f4dbe943b6b]
l2_chain_type = "Arbitrum"
l2_chain_id = 123
l2_api_url = "http://localhost:3001"
l2_blockscout_url = "http://arbitrum.blockscout.com"
l1_chain_id = 456 # optional
request_timeout = 30 # optional
request_retries = 2 # optional
```

## Dev

Expand Down
10 changes: 9 additions & 1 deletion da-indexer/da-indexer-logic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,27 @@ futures = "0.3"
jsonrpsee = { version = "0.20", features = ["client-core", "macros"] }
serde = "1.0"
serde_with = "3.6.1"
serde_json = "1.0.96"
async-trait = "0.1"
http = "0.2.9"
tonic = { version = "0.7", features = ["tls", "tls-roots"] }
prost = "0.10"
ethabi = "18.0"
ethers = { version = "2.0.11", features = ["ws"] }
reqwest = { version = "0.12.5", features = ["json"] }
chrono = "0.4"
toml = "0.8.14"
reqwest-middleware = "0.3.3"
reqwest-retry = "0.6.1"

[dev-dependencies]
blockscout-service-launcher = { version = "0.9.0", features = ["test-database", "database-0_12"] }
pretty_assertions = "1.3"
da-indexer-migration = {path = "../da-indexer-migration"}
serde_json = "1.0"
base64 = "0.22.0"
wiremock = "0.6"
toml = "0.8.14"
base64 = "0.22.1"

[build-dependencies]
tonic-build = "0.7"
65 changes: 65 additions & 0 deletions da-indexer/da-indexer-logic/src/celestia/l2_router/arbitrum.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use super::{
new_client,
types::{L2BatchMetadata, L2Config},
};
use anyhow::Result;
use blockscout_display_bytes::Bytes;
use chrono::DateTime;
use reqwest::{StatusCode, Url};
use serde::Deserialize;

#[derive(Deserialize, Debug)]
struct CommitmentTransaction {
hash: String,
timestamp: String,
}

#[derive(Deserialize, Debug)]
struct L2BatchArbitrum {
commitment_transaction: CommitmentTransaction,
end_block: u64,
number: u64,
start_block: u64,
transactions_count: u64,
}

pub async fn get_l2_batch(
config: &L2Config,
height: u64,
commitment: &[u8],
) -> Result<Option<L2BatchMetadata>> {
let commitment = Bytes::from(commitment.to_vec()).to_string();
let query = format!(
"{}/api/v2/arbitrum/batches/da/celestia/{}/{}",
config.l2_api_url, height, commitment,
);

let response = new_client(config)?.get(&query).send().await?;

if response.status() == StatusCode::NOT_FOUND {
tracing::debug!(
height,
commitment = hex::encode(&commitment),
"l2 batch metadata not found"
);
return Ok(None);
}
let response: L2BatchArbitrum = response.json().await?;

Ok(Some(L2BatchMetadata {
chain_type: super::types::L2Type::Arbitrum,
l2_chain_id: config.l2_chain_id,
l2_batch_id: response.number.to_string(),
l2_start_block: response.start_block,
l2_end_block: response.end_block,
l2_batch_tx_count: response.transactions_count as u32,
l2_blockscout_url: Url::parse(&config.l2_blockscout_url)?
.join(&format!("batches/{}", response.number))?
.to_string(),
l1_tx_hash: response.commitment_transaction.hash,
l1_tx_timestamp: DateTime::parse_from_rfc3339(&response.commitment_transaction.timestamp)?
.timestamp() as u64,
l1_chain_id: config.l1_chain_id,
related_blobs: vec![], // Arbitrum indexer doesn't support related blobs
}))
}
71 changes: 71 additions & 0 deletions da-indexer/da-indexer-logic/src/celestia/l2_router/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
mod arbitrum;
mod optimism;
pub mod settings;
pub mod types;

use anyhow::Result;
use blockscout_display_bytes::ToHex;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use serde::{Deserialize, Serialize};
use settings::L2RouterSettings;
use std::{collections::HashMap, fs};
use types::{L2BatchMetadata, L2Config, L2Type};

#[derive(Serialize, Deserialize)]
pub struct L2Router {
pub routes: HashMap<String, L2Config>,
}

impl L2Router {
pub fn new(routes: HashMap<String, L2Config>) -> Result<Self> {
Ok(Self { routes })
}

pub fn from_settings(settings: L2RouterSettings) -> Result<Self> {
let routes = fs::read_to_string(&settings.routes_path).map_err(|err| {
anyhow::anyhow!(
"failed to read routes file from path {}: {}",
settings.routes_path,
err
)
})?;
let router: L2Router = toml::from_str(&routes)?;
router.routes.iter().for_each(|(namespace, config)| {
tracing::info!("registered route: {} -> {:?}", namespace, config);
});
Ok(router)
}

pub async fn get_l2_batch_metadata(
&self,
height: u64,
namespace: &[u8],
commitment: &[u8],
) -> Result<Option<L2BatchMetadata>> {
let namespace = ToHex::to_hex(&namespace);
let config = match self.routes.get(&namespace) {
Some(config) => config,
None => {
tracing::debug!("unknown namespace: {}", &namespace);
return Ok(None);
}
};

match config.l2_chain_type {
L2Type::Optimism => optimism::get_l2_batch(config, height, commitment).await,
L2Type::Arbitrum => arbitrum::get_l2_batch(config, height, commitment).await,
}
}
}

pub fn new_client(config: &L2Config) -> Result<ClientWithMiddleware> {
let retry_policy = ExponentialBackoff::builder().build_with_max_retries(config.request_retries);
Ok(ClientBuilder::new(
reqwest::Client::builder()
.timeout(config.request_timeout)
.build()?,
)
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
.build())
}
86 changes: 86 additions & 0 deletions da-indexer/da-indexer-logic/src/celestia/l2_router/optimism.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use super::{new_client, types::L2BatchMetadata, L2Config};
use anyhow::{anyhow, Result};
use blockscout_display_bytes::Bytes;
use chrono::DateTime;
use reqwest::{StatusCode, Url};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
struct Blob {
commitment: String,
height: u64,
l1_timestamp: String,
l1_transaction_hash: String,
namespace: String,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct L2BatchOptimism {
batch_data_container: String,
blobs: Vec<Blob>,
internal_id: u64,
l1_timestamp: String,
l1_tx_hashes: Vec<String>,
l2_block_start: u64,
l2_block_end: u64,
tx_count: u64,
}

pub async fn get_l2_batch(
config: &L2Config,
height: u64,
commitment: &[u8],
) -> Result<Option<L2BatchMetadata>> {
let commitment = Bytes::from(commitment.to_vec()).to_string();
let query = format!(
"{}/api/v2/optimism/batches/da/celestia/{}/{}",
config.l2_api_url, height, commitment,
);

let response = new_client(config)?.get(&query).send().await?;

if response.status() == StatusCode::NOT_FOUND {
tracing::debug!(
height,
commitment = hex::encode(&commitment),
"l2 batch metadata not found"
);
return Ok(None);
}
let mut response: L2BatchOptimism = response.json().await?;

let l1_tx_hash = response
.blobs
.iter()
.find(|blob| blob.commitment == commitment)
.ok_or(anyhow!("l1 transaction hash not found"))?
.l1_transaction_hash
.clone();

let related_blobs = response
.blobs
.drain(..)
.filter(|blob| blob.commitment != commitment)
.map(|blob| super::types::CelestiaBlobId {
height: blob.height,
namespace: blob.namespace,
commitment: blob.commitment,
})
.collect();

Ok(Some(L2BatchMetadata {
chain_type: super::types::L2Type::Optimism,
l2_chain_id: config.l2_chain_id,
l2_batch_id: response.internal_id.to_string(),
l2_start_block: response.l2_block_start,
l2_end_block: response.l2_block_end,
l2_batch_tx_count: response.tx_count as u32,
l2_blockscout_url: Url::parse(&config.l2_blockscout_url)?
.join(&format!("batches/{}", response.internal_id))?
.to_string(),
l1_tx_hash,
l1_tx_timestamp: DateTime::parse_from_rfc3339(&response.l1_timestamp)?.timestamp() as u64,
l1_chain_id: config.l1_chain_id,
related_blobs,
}))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use serde::Deserialize;

#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
#[serde(deny_unknown_fields)]
pub struct L2RouterSettings {
pub routes_path: String,
}
54 changes: 54 additions & 0 deletions da-indexer/da-indexer-logic/src/celestia/l2_router/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::time;

use serde::{Deserialize, Serialize};
use serde_with::serde_as;

#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub enum L2Type {
Optimism,
Arbitrum,
}

#[serde_as]
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct L2Config {
pub l2_chain_type: L2Type,
pub l2_chain_id: u32,
pub l2_api_url: String,
pub l2_blockscout_url: String,
pub l1_chain_id: Option<u32>,
#[serde(default = "default_request_timeout")]
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
pub request_timeout: time::Duration,
#[serde(default = "default_request_retries")]
pub request_retries: u32,
}

fn default_request_timeout() -> time::Duration {
time::Duration::from_secs(5)
}

fn default_request_retries() -> u32 {
1
}

pub struct CelestiaBlobId {
pub namespace: String,
pub height: u64,
pub commitment: String,
}

pub struct L2BatchMetadata {
pub chain_type: L2Type,
pub l2_chain_id: u32,
pub l2_batch_id: String,
pub l2_start_block: u64,
pub l2_end_block: u64,
pub l2_batch_tx_count: u32,
pub l2_blockscout_url: String,
pub l1_tx_hash: String,
pub l1_tx_timestamp: u64,
pub l1_chain_id: Option<u32>,
pub related_blobs: Vec<CelestiaBlobId>,
}
1 change: 1 addition & 0 deletions da-indexer/da-indexer-logic/src/celestia/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod da;
pub mod job;
pub mod l2_router;
mod parser;
pub mod repository;
mod rpc_client;
Expand Down
Loading
Loading