diff --git a/services/publisher/src/cli.rs b/services/publisher/src/cli.rs index 2c230c0e..cfd1e5fb 100644 --- a/services/publisher/src/cli.rs +++ b/services/publisher/src/cli.rs @@ -55,4 +55,13 @@ pub struct Cli { help = "Enable metrics" )] pub use_metrics: bool, + /// Historical gap processing interval in seconds + #[arg( + long, + value_name = "HISTORY_INTERVAL", + env = "HISTORY_INTERVAL", + default_value = "259200", + help = "Interval in seconds for processing historical gaps (default: 3 days). Set to 0 to disable." + )] + pub history_interval: u64, } diff --git a/services/publisher/src/history.rs b/services/publisher/src/history.rs index 9354572b..a5b85896 100644 --- a/services/publisher/src/history.rs +++ b/services/publisher/src/history.rs @@ -14,7 +14,9 @@ use crate::{ publish::publish_block, }; +#[allow(clippy::too_many_arguments)] pub async fn process_historical_gaps_periodically( + interval_secs: u64, from_block: BlockHeight, db: &Arc, message_broker: &Arc, @@ -23,8 +25,13 @@ pub async fn process_historical_gaps_periodically( shutdown: &Arc, telemetry: &Arc>, ) -> Result<(), anyhow::Error> { - // Run every 5 hours - let mut interval = interval(Duration::from_secs(3600 * 5)); + if interval_secs == 0 { + tracing::info!("Historical gap processing is disabled"); + return Ok(()); + } + + // Run at the specified interval + let mut interval = interval(Duration::from_secs(interval_secs)); loop { if shutdown.token().is_cancelled() { diff --git a/services/publisher/src/main.rs b/services/publisher/src/main.rs index d3f87b16..065a5f77 100644 --- a/services/publisher/src/main.rs +++ b/services/publisher/src/main.rs @@ -48,6 +48,7 @@ async fn main() -> anyhow::Result<()> { tokio::join!( recover_tx_pointers(&db), process_historical_gaps_periodically( + cli.history_interval, cli.from_block.into(), &db, &message_broker, diff --git a/services/publisher/src/recover.rs b/services/publisher/src/recover.rs index 4bf7b9cd..a5cd5e19 100644 --- a/services/publisher/src/recover.rs +++ b/services/publisher/src/recover.rs @@ -25,7 +25,7 @@ async fn fetch_transaction_chunk( pool: &PgPool, offset: i64, ) -> Result> { - println!("Fetching transaction chunk with offset {}", offset); + tracing::info!("Fetching transaction chunk with offset {}", offset); sqlx::query_as::<_, TransactionRecord>( "SELECT id, block_height, tx_index FROM transactions @@ -106,7 +106,7 @@ pub async fn recover_tx_pointers(db: &Arc) -> Result<()> { total_updated += result??; } - println!( + tracing::info!( "Successfully completed transaction updates. Total transactions updated: {}", total_updated );