Skip to content

Commit 6f40512

Browse files
AllFirimrakhimov
andauthored
feat(da_indexer): add l2BatchMetadata endpoint (#999)
* Implement Celestia L2 router * Minor fixes * Update README.md * Add optional l1_chain_id, update bs urls * Update Cargo.lock * Update Cargo.lock * Refactor code * Update da-indexer/da-indexer-logic/src/celestia/l2_router/mod.rs Co-authored-by: Rim Rakhimov <rimrakhimov@gmail.com> * Update da-indexer/da-indexer-logic/src/celestia/l2_router/mod.rs Co-authored-by: Rim Rakhimov <rimrakhimov@gmail.com> * Fix review comments --------- Co-authored-by: Rim Rakhimov <rimrakhimov@gmail.com>
1 parent 6f1cf03 commit 6f40512

File tree

27 files changed

+1122
-229
lines changed

27 files changed

+1122
-229
lines changed

da-indexer/Cargo.lock

Lines changed: 337 additions & 164 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

da-indexer/README.md

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
DA Indexer Service
22
===
33

4-
The DA Indexer service collects blobs from different DA solutions (currently only Celestia and EigenDA) and provides a convenient API for fetching blob data.
4+
The DA Indexer service collects blobs from different DA solutions (currently only Celestia and EigenDA) and provides a convenient API for fetching blob data. In addition to indexing blobs, this service can be configured to fetch L2 batch metadata corresponding to a specific blob (currently only available for Celestia).
55

66
## Celestia
77
The Celestia indexer runs on top of the [Celestia light node](https://docs.celestia.org/nodes/light-node). It is worth noting that the indexer collects only blobs and some block metadata, it does not collect full blocks, transactions, etc.
@@ -23,6 +23,7 @@ The EigenDA indexer runs on top of the EigenDA disperser. It is worth mentioning
2323
| DA_INDEXER__INDEXER__RETRY_INTERVAL | The delay between attempts to reprocess failed jobs | 180 seconds |
2424
| DA_INDEXER__INDEXER__CATCHUP_INTERVAL | The delay between attempts to process missing jobs | 0 seconds |
2525
| DA_INDEXER__DA__TYPE | "Celestia" or "EigenDA" | |
26+
| DA_INDEXER__L2_ROUTER__ROUTES_PATH | Path to the routes config file | |
2627

2728

2829
### Celestia
@@ -44,6 +45,24 @@ The EigenDA indexer runs on top of the EigenDA disperser. It is worth mentioning
4445
| DA_INDEXER__INDEXER__DA__SAVE_BATCH_SIZE | The number of blobs to save per db transaction | |
4546
| DA_INDEXER__INDEXER__DA__PRUNING_BLOCK_THRESHOLD | The threshold above which blobs might be unavailable | |
4647

48+
### L2 Batch Metadata
49+
To fetch L2 batch metadata, the service must be aware of the L2s that use Celestia as a DA layer and the namespaces they utilize. This information is configured in a separate file, with its path specified in the `DA_INDEXER__L2_ROUTER__ROUTES_PATH` environment variable. Indexer and database configuration are optional if the `DA_INDEXER__L2_ROUTER__ROUTES_PATH` environment variable is set. An example of the routes config is shown below:
50+
```toml
51+
[routes.0x00000000000000000000000000000000000000000008e5f679bf7116cb]
52+
l2_chain_type = "Optimism"
53+
l2_chain_id = 123420111
54+
l2_api_url = "https://opcelestia-raspberry.gelatoscout.com/"
55+
l2_blockscout_url = "https://opcelestia-raspberry.gelatoscout.com/"
56+
57+
[routes.0x00000000000000000000000000000000000000ca1de12a1f4dbe943b6b]
58+
l2_chain_type = "Arbitrum"
59+
l2_chain_id = 123
60+
l2_api_url = "http://localhost:3001"
61+
l2_blockscout_url = "http://arbitrum.blockscout.com"
62+
l1_chain_id = 456 # optional
63+
request_timeout = 30 # optional
64+
request_retries = 2 # optional
65+
```
4766

4867
## Dev
4968

da-indexer/da-indexer-logic/Cargo.toml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,27 @@ futures = "0.3"
2727
jsonrpsee = { version = "0.20", features = ["client-core", "macros"] }
2828
serde = "1.0"
2929
serde_with = "3.6.1"
30+
serde_json = "1.0.96"
3031
async-trait = "0.1"
3132
http = "0.2.9"
3233
tonic = { version = "0.7", features = ["tls", "tls-roots"] }
3334
prost = "0.10"
3435
ethabi = "18.0"
3536
ethers = { version = "2.0.11", features = ["ws"] }
37+
reqwest = { version = "0.12.5", features = ["json"] }
38+
chrono = "0.4"
39+
toml = "0.8.14"
40+
reqwest-middleware = "0.3.3"
41+
reqwest-retry = "0.6.1"
3642

3743
[dev-dependencies]
3844
blockscout-service-launcher = { version = "0.9.0", features = ["test-database", "database-0_12"] }
3945
pretty_assertions = "1.3"
4046
da-indexer-migration = {path = "../da-indexer-migration"}
4147
serde_json = "1.0"
42-
base64 = "0.22.0"
48+
wiremock = "0.6"
49+
toml = "0.8.14"
50+
base64 = "0.22.1"
4351

4452
[build-dependencies]
4553
tonic-build = "0.7"
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
use super::{
2+
new_client,
3+
types::{L2BatchMetadata, L2Config},
4+
};
5+
use anyhow::Result;
6+
use blockscout_display_bytes::Bytes;
7+
use chrono::DateTime;
8+
use reqwest::{StatusCode, Url};
9+
use serde::Deserialize;
10+
11+
#[derive(Deserialize, Debug)]
12+
struct CommitmentTransaction {
13+
hash: String,
14+
timestamp: String,
15+
}
16+
17+
#[derive(Deserialize, Debug)]
18+
struct L2BatchArbitrum {
19+
commitment_transaction: CommitmentTransaction,
20+
end_block: u64,
21+
number: u64,
22+
start_block: u64,
23+
transactions_count: u64,
24+
}
25+
26+
pub async fn get_l2_batch(
27+
config: &L2Config,
28+
height: u64,
29+
commitment: &[u8],
30+
) -> Result<Option<L2BatchMetadata>> {
31+
let commitment = Bytes::from(commitment.to_vec()).to_string();
32+
let query = format!(
33+
"{}/api/v2/arbitrum/batches/da/celestia/{}/{}",
34+
config.l2_api_url, height, commitment,
35+
);
36+
37+
let response = new_client(config)?.get(&query).send().await?;
38+
39+
if response.status() == StatusCode::NOT_FOUND {
40+
tracing::debug!(
41+
height,
42+
commitment = hex::encode(&commitment),
43+
"l2 batch metadata not found"
44+
);
45+
return Ok(None);
46+
}
47+
let response: L2BatchArbitrum = response.json().await?;
48+
49+
Ok(Some(L2BatchMetadata {
50+
chain_type: super::types::L2Type::Arbitrum,
51+
l2_chain_id: config.l2_chain_id,
52+
l2_batch_id: response.number.to_string(),
53+
l2_start_block: response.start_block,
54+
l2_end_block: response.end_block,
55+
l2_batch_tx_count: response.transactions_count as u32,
56+
l2_blockscout_url: Url::parse(&config.l2_blockscout_url)?
57+
.join(&format!("batches/{}", response.number))?
58+
.to_string(),
59+
l1_tx_hash: response.commitment_transaction.hash,
60+
l1_tx_timestamp: DateTime::parse_from_rfc3339(&response.commitment_transaction.timestamp)?
61+
.timestamp() as u64,
62+
l1_chain_id: config.l1_chain_id,
63+
related_blobs: vec![], // Arbitrum indexer doesn't support related blobs
64+
}))
65+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
mod arbitrum;
2+
mod optimism;
3+
pub mod settings;
4+
pub mod types;
5+
6+
use anyhow::Result;
7+
use blockscout_display_bytes::ToHex;
8+
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
9+
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
10+
use serde::{Deserialize, Serialize};
11+
use settings::L2RouterSettings;
12+
use std::{collections::HashMap, fs};
13+
use types::{L2BatchMetadata, L2Config, L2Type};
14+
15+
#[derive(Serialize, Deserialize)]
16+
pub struct L2Router {
17+
pub routes: HashMap<String, L2Config>,
18+
}
19+
20+
impl L2Router {
21+
pub fn new(routes: HashMap<String, L2Config>) -> Result<Self> {
22+
Ok(Self { routes })
23+
}
24+
25+
pub fn from_settings(settings: L2RouterSettings) -> Result<Self> {
26+
let routes = fs::read_to_string(&settings.routes_path).map_err(|err| {
27+
anyhow::anyhow!(
28+
"failed to read routes file from path {}: {}",
29+
settings.routes_path,
30+
err
31+
)
32+
})?;
33+
let router: L2Router = toml::from_str(&routes)?;
34+
router.routes.iter().for_each(|(namespace, config)| {
35+
tracing::info!("registered route: {} -> {:?}", namespace, config);
36+
});
37+
Ok(router)
38+
}
39+
40+
pub async fn get_l2_batch_metadata(
41+
&self,
42+
height: u64,
43+
namespace: &[u8],
44+
commitment: &[u8],
45+
) -> Result<Option<L2BatchMetadata>> {
46+
let namespace = ToHex::to_hex(&namespace);
47+
let config = match self.routes.get(&namespace) {
48+
Some(config) => config,
49+
None => {
50+
tracing::debug!("unknown namespace: {}", &namespace);
51+
return Ok(None);
52+
}
53+
};
54+
55+
match config.l2_chain_type {
56+
L2Type::Optimism => optimism::get_l2_batch(config, height, commitment).await,
57+
L2Type::Arbitrum => arbitrum::get_l2_batch(config, height, commitment).await,
58+
}
59+
}
60+
}
61+
62+
pub fn new_client(config: &L2Config) -> Result<ClientWithMiddleware> {
63+
let retry_policy = ExponentialBackoff::builder().build_with_max_retries(config.request_retries);
64+
Ok(ClientBuilder::new(
65+
reqwest::Client::builder()
66+
.timeout(config.request_timeout)
67+
.build()?,
68+
)
69+
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
70+
.build())
71+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
use super::{new_client, types::L2BatchMetadata, L2Config};
2+
use anyhow::{anyhow, Result};
3+
use blockscout_display_bytes::Bytes;
4+
use chrono::DateTime;
5+
use reqwest::{StatusCode, Url};
6+
use serde::{Deserialize, Serialize};
7+
8+
#[derive(Serialize, Deserialize, Debug)]
9+
struct Blob {
10+
commitment: String,
11+
height: u64,
12+
l1_timestamp: String,
13+
l1_transaction_hash: String,
14+
namespace: String,
15+
}
16+
17+
#[derive(Serialize, Deserialize, Debug)]
18+
pub struct L2BatchOptimism {
19+
batch_data_container: String,
20+
blobs: Vec<Blob>,
21+
internal_id: u64,
22+
l1_timestamp: String,
23+
l1_tx_hashes: Vec<String>,
24+
l2_block_start: u64,
25+
l2_block_end: u64,
26+
tx_count: u64,
27+
}
28+
29+
pub async fn get_l2_batch(
30+
config: &L2Config,
31+
height: u64,
32+
commitment: &[u8],
33+
) -> Result<Option<L2BatchMetadata>> {
34+
let commitment = Bytes::from(commitment.to_vec()).to_string();
35+
let query = format!(
36+
"{}/api/v2/optimism/batches/da/celestia/{}/{}",
37+
config.l2_api_url, height, commitment,
38+
);
39+
40+
let response = new_client(config)?.get(&query).send().await?;
41+
42+
if response.status() == StatusCode::NOT_FOUND {
43+
tracing::debug!(
44+
height,
45+
commitment = hex::encode(&commitment),
46+
"l2 batch metadata not found"
47+
);
48+
return Ok(None);
49+
}
50+
let mut response: L2BatchOptimism = response.json().await?;
51+
52+
let l1_tx_hash = response
53+
.blobs
54+
.iter()
55+
.find(|blob| blob.commitment == commitment)
56+
.ok_or(anyhow!("l1 transaction hash not found"))?
57+
.l1_transaction_hash
58+
.clone();
59+
60+
let related_blobs = response
61+
.blobs
62+
.drain(..)
63+
.filter(|blob| blob.commitment != commitment)
64+
.map(|blob| super::types::CelestiaBlobId {
65+
height: blob.height,
66+
namespace: blob.namespace,
67+
commitment: blob.commitment,
68+
})
69+
.collect();
70+
71+
Ok(Some(L2BatchMetadata {
72+
chain_type: super::types::L2Type::Optimism,
73+
l2_chain_id: config.l2_chain_id,
74+
l2_batch_id: response.internal_id.to_string(),
75+
l2_start_block: response.l2_block_start,
76+
l2_end_block: response.l2_block_end,
77+
l2_batch_tx_count: response.tx_count as u32,
78+
l2_blockscout_url: Url::parse(&config.l2_blockscout_url)?
79+
.join(&format!("batches/{}", response.internal_id))?
80+
.to_string(),
81+
l1_tx_hash,
82+
l1_tx_timestamp: DateTime::parse_from_rfc3339(&response.l1_timestamp)?.timestamp() as u64,
83+
l1_chain_id: config.l1_chain_id,
84+
related_blobs,
85+
}))
86+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
use serde::Deserialize;
2+
3+
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
4+
#[serde(deny_unknown_fields)]
5+
pub struct L2RouterSettings {
6+
pub routes_path: String,
7+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use std::time;
2+
3+
use serde::{Deserialize, Serialize};
4+
use serde_with::serde_as;
5+
6+
#[derive(Debug, PartialEq, Serialize, Deserialize)]
7+
pub enum L2Type {
8+
Optimism,
9+
Arbitrum,
10+
}
11+
12+
#[serde_as]
13+
#[derive(Serialize, Deserialize, Debug)]
14+
#[serde(deny_unknown_fields)]
15+
pub struct L2Config {
16+
pub l2_chain_type: L2Type,
17+
pub l2_chain_id: u32,
18+
pub l2_api_url: String,
19+
pub l2_blockscout_url: String,
20+
pub l1_chain_id: Option<u32>,
21+
#[serde(default = "default_request_timeout")]
22+
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
23+
pub request_timeout: time::Duration,
24+
#[serde(default = "default_request_retries")]
25+
pub request_retries: u32,
26+
}
27+
28+
fn default_request_timeout() -> time::Duration {
29+
time::Duration::from_secs(5)
30+
}
31+
32+
fn default_request_retries() -> u32 {
33+
1
34+
}
35+
36+
pub struct CelestiaBlobId {
37+
pub namespace: String,
38+
pub height: u64,
39+
pub commitment: String,
40+
}
41+
42+
pub struct L2BatchMetadata {
43+
pub chain_type: L2Type,
44+
pub l2_chain_id: u32,
45+
pub l2_batch_id: String,
46+
pub l2_start_block: u64,
47+
pub l2_end_block: u64,
48+
pub l2_batch_tx_count: u32,
49+
pub l2_blockscout_url: String,
50+
pub l1_tx_hash: String,
51+
pub l1_tx_timestamp: u64,
52+
pub l1_chain_id: Option<u32>,
53+
pub related_blobs: Vec<CelestiaBlobId>,
54+
}

da-indexer/da-indexer-logic/src/celestia/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
pub mod da;
22
pub mod job;
3+
pub mod l2_router;
34
mod parser;
45
pub mod repository;
56
mod rpc_client;

0 commit comments

Comments
 (0)