Skip to content

Commit e9f298c

Browse files
committed
feat(sv-dune): Add max_blocks_to_store
1 parent 1199501 commit e9f298c

File tree

4 files changed

+114
-44
lines changed

4 files changed

+114
-44
lines changed

services/dune/src/cli.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,11 @@ pub struct Cli {
3131
help = "Type of storage to use. Options are 'S3' or 'File'."
3232
)]
3333
pub storage_type: String,
34+
35+
#[arg(
36+
long,
37+
value_name = "MAX_BLOCKS_TO_STORE",
38+
env = "MAX_BLOCKS_TO_STORE"
39+
)]
40+
pub max_blocks_to_store: Option<u16>,
3441
}

services/dune/src/helpers/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
mod avro;
22
mod batch_calculator;
3+
mod store;
34
mod test_helpers;
45

56
pub use avro::*;
67
pub use batch_calculator::*;
8+
pub use store::*;
79
pub use test_helpers::*;

services/dune/src/helpers/store.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
use std::path::Path;
2+
3+
use anyhow::Result;
4+
use bytes::Bytes;
5+
use fuel_data_parser::DataEncoder;
6+
use fuel_streams_domains::blocks::Block;
7+
use fuel_streams_types::BlockHeight;
8+
use surrealkv::Store as SurrealStore;
9+
10+
pub struct Store {
11+
inner: SurrealStore,
12+
}
13+
14+
impl Store {
15+
pub fn new(storage_dir: Option<&str>) -> Result<Self> {
16+
let mut opts = surrealkv::Options::new();
17+
let storage_dir = match storage_dir {
18+
Some(file_dir) => Path::new(file_dir).to_path_buf(),
19+
None => {
20+
let manifest_dir = env!("CARGO_MANIFEST_DIR");
21+
let manifest_dir =
22+
Path::new(manifest_dir).join("output/storage");
23+
manifest_dir.to_path_buf()
24+
}
25+
};
26+
opts.dir = storage_dir;
27+
opts.disk_persistence = true;
28+
let store = SurrealStore::new(opts)?;
29+
Ok(Self { inner: store })
30+
}
31+
32+
pub async fn get_last_block_saved(&self) -> Result<BlockHeight> {
33+
let block_height = {
34+
let mut txn = self.inner.begin()?;
35+
let key = Bytes::from("last_block_saved");
36+
match txn.get(&key)? {
37+
Some(value) => serde_json::from_slice(&value)?,
38+
None => BlockHeight::from(0),
39+
}
40+
};
41+
Ok(block_height)
42+
}
43+
44+
pub async fn save_last_block(&self, block: &Block) -> Result<()> {
45+
let mut txn = self.inner.begin()?;
46+
let key = Bytes::from("last_block_saved");
47+
let value = block.height.encode_json()?;
48+
txn.set(&key, &value)?;
49+
txn.commit()?;
50+
Ok(())
51+
}
52+
53+
pub async fn get_total_blocks(&self) -> Result<u16> {
54+
let total = {
55+
let mut txn = self.inner.begin()?;
56+
let key = Bytes::from("blocks_saved");
57+
match txn.get(&key)? {
58+
Some(value) => serde_json::from_slice(&value)?,
59+
None => 0u16,
60+
}
61+
};
62+
Ok(total)
63+
}
64+
65+
pub async fn save_total_blocks(&self, blocks_num: u16) -> Result<()> {
66+
let current = self.get_total_blocks().await?;
67+
let mut txn = self.inner.begin()?;
68+
let key = Bytes::from("blocks_saved");
69+
let new_total = current + blocks_num;
70+
let value = serde_json::to_vec(&new_total)?;
71+
txn.set(&key, &value)?;
72+
txn.commit()?;
73+
Ok(())
74+
}
75+
76+
pub async fn should_continue_processing(
77+
&self,
78+
max_blocks: Option<u16>,
79+
) -> Result<bool> {
80+
if let Some(max) = max_blocks {
81+
let total = self.get_total_blocks().await?;
82+
Ok(total < max)
83+
} else {
84+
Ok(true)
85+
}
86+
}
87+
}

services/dune/src/main.rs

Lines changed: 18 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
use std::{path::Path, sync::Arc, time::Instant};
1+
use std::{sync::Arc, time::Instant};
22

33
use anyhow::Result;
4-
use bytes::Bytes;
54
use clap::Parser;
65
use fuel_data_parser::DataEncoder;
76
use fuel_streams_domains::{
@@ -11,9 +10,8 @@ use fuel_streams_domains::{
1110
};
1211
use fuel_streams_types::BlockHeight;
1312
use fuel_web_utils::{shutdown::ShutdownController, tracing::init_tracing};
14-
use surrealkv::{Options, Store};
1513
use sv_dune::{
16-
helpers::BatchCalculator,
14+
helpers::{BatchCalculator, Store},
1715
processor::BlocksProcessor,
1816
Cli,
1917
DuneError,
@@ -32,7 +30,7 @@ async fn main() -> Result<()> {
3230

3331
let opts = QueryOptions::default();
3432
let last_height_indexed = Block::find_last_block_height(&db, &opts).await?;
35-
let last_height_saved = get_last_block_saved(&store).await?;
33+
let last_height_saved = store.get_last_block_saved().await?;
3634
if *last_height_saved >= *last_height_indexed {
3735
tracing::info!("Last block saved is up to date");
3836
return Ok(());
@@ -71,19 +69,7 @@ async fn setup_db(db_url: &str) -> Result<Arc<Db>, DuneError> {
7169
}
7270

7371
fn setup_store(cli: &Cli) -> Result<Store> {
74-
let mut opts = Options::new();
75-
let storage_dir = match cli.storage_file_dir.as_ref() {
76-
Some(file_dir) => Path::new(file_dir).to_path_buf(),
77-
None => {
78-
let manifest_dir = env!("CARGO_MANIFEST_DIR");
79-
let manifest_dir = Path::new(manifest_dir).join("output/storage");
80-
manifest_dir.to_path_buf()
81-
}
82-
};
83-
opts.dir = storage_dir.to_owned();
84-
opts.disk_persistence = true;
85-
let store = Store::new(opts)?;
86-
Ok(store)
72+
Store::new(cli.storage_file_dir.as_deref())
8773
}
8874

8975
async fn process_blocks(
@@ -112,6 +98,18 @@ async fn process_blocks(
11298
break;
11399
}
114100

101+
// Check if we should continue processing based on max_blocks
102+
if !store
103+
.should_continue_processing(cli.max_blocks_to_store)
104+
.await?
105+
{
106+
tracing::info!(
107+
"Reached maximum blocks to store ({}), stopping processing",
108+
cli.max_blocks_to_store.unwrap()
109+
);
110+
break;
111+
}
112+
115113
let start_time = Instant::now();
116114
let batch_end = std::cmp::min(
117115
current_start + (BATCH_SIZE - 1) as u32,
@@ -131,7 +129,8 @@ async fn process_blocks(
131129

132130
process_batch(&processor, &blocks_and_txs, shutdown).await?;
133131
if let Some((block, _)) = blocks_and_txs.last() {
134-
save_last_block(store, block).await?;
132+
store.save_last_block(block).await?;
133+
store.save_total_blocks(blocks_and_txs.len() as u16).await?;
135134
}
136135
tracing::info!("Batch processed in {:?}", start_time.elapsed());
137136
current_start = batch_end + 1;
@@ -230,31 +229,6 @@ async fn get_initial_height(
230229
}
231230
}
232231

233-
async fn get_last_block_saved(store: &Store) -> anyhow::Result<BlockHeight> {
234-
let block_height = {
235-
let mut txn = store.begin()?;
236-
let key = Bytes::from("last_block_saved");
237-
match txn.get(&key)? {
238-
Some(value) => {
239-
Ok::<BlockHeight, DuneError>(serde_json::from_slice(&value)?)
240-
}
241-
None => Ok(BlockHeight::from(0)),
242-
}
243-
}?;
244-
Ok(block_height)
245-
}
246-
247-
async fn save_last_block(store: &Store, block: &Block) -> Result<()> {
248-
{
249-
let mut txn = store.begin()?;
250-
let key = Bytes::from("last_block_saved");
251-
let value = block.height.encode_json()?;
252-
txn.set(&key, &value)?;
253-
txn.commit()?;
254-
}
255-
Ok(())
256-
}
257-
258232
async fn get_blocks_and_transactions(
259233
db: &Arc<Db>,
260234
start_height: BlockHeight,

0 commit comments

Comments
 (0)