diff --git a/Cargo.lock b/Cargo.lock index 20dfb2c62db..7913ad9a86b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8892,6 +8892,7 @@ dependencies = [ "reth-payload-builder-primitives", "reth-payload-primitives", "reth-provider", + "reth-stages-api", "reth-tasks", "reth-tokio-util", "reth-transaction-pool", diff --git a/crates/engine/service/src/service.rs b/crates/engine/service/src/service.rs index ff9eb66f100..cb9339a25fe 100644 --- a/crates/engine/service/src/service.rs +++ b/crates/engine/service/src/service.rs @@ -21,10 +21,10 @@ use reth_node_types::{BlockTy, NodeTypes}; use reth_payload_builder::PayloadBuilderHandle; use reth_provider::{ providers::{BlockchainProvider, ProviderNodeTypes}, - ProviderFactory, + DatabaseProviderFactory, ProviderFactory, }; use reth_prune::PrunerWithFactory; -use reth_stages_api::{MetricEventsSender, Pipeline}; +use reth_stages_api::{BoxedStage, MetricEventsSender, Pipeline}; use reth_tasks::TaskSpawner; use std::{ pin::Pin, @@ -84,6 +84,7 @@ where tree_config: TreeConfig, sync_metrics_tx: MetricEventsSender, evm_config: C, + custom_stages: Vec as DatabaseProviderFactory>::ProviderRW>>, ) -> Self where V: EngineValidator, @@ -94,8 +95,12 @@ where let downloader = BasicBlockDownloader::new(client, consensus.clone()); - let persistence_handle = - PersistenceHandle::::spawn_service(provider, pruner, sync_metrics_tx); + let persistence_handle = PersistenceHandle::::spawn_service( + provider, + pruner, + sync_metrics_tx, + custom_stages, + ); let canonical_in_memory_state = blockchain_db.canonical_in_memory_state(); @@ -214,6 +219,7 @@ mod tests { TreeConfig::default(), sync_metrics_tx, evm_config, + Default::default(), ); } } diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index de5b10c331c..d4d03db4028 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -7,10 +7,14 @@ use reth_ethereum_primitives::EthPrimitives; use reth_primitives_traits::NodePrimitives; use reth_provider::{ providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter, - DBProvider, DatabaseProviderFactory, ProviderFactory, + DBProvider, DatabaseProviderFactory, ProviderFactory, StageCheckpointReader, + StageCheckpointWriter, }; use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory}; -use reth_stages_api::{MetricEvent, MetricEventsSender}; +use reth_stages_api::{ + BoxedStage, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, StageError, UnwindInput, + UnwindOutput, +}; use std::{ sync::mpsc::{Receiver, SendError, Sender}, time::Instant, @@ -26,7 +30,7 @@ use tracing::{debug, error}; /// /// This should be spawned in its own thread with [`std::thread::spawn`], since this performs /// blocking I/O operations in an endless loop. -#[derive(Debug)] +#[expect(missing_debug_implementations)] pub struct PersistenceService where N: ProviderNodeTypes, @@ -41,6 +45,8 @@ where metrics: PersistenceMetrics, /// Sender for sync metrics - we only submit sync metrics for persisted blocks sync_metrics_tx: MetricEventsSender, + /// Custom pipeline stages advanced on new blocks. + custom_stages: Vec as DatabaseProviderFactory>::ProviderRW>>, } impl PersistenceService @@ -53,8 +59,16 @@ where incoming: Receiver>, pruner: PrunerWithFactory>, sync_metrics_tx: MetricEventsSender, + custom_stages: Vec as DatabaseProviderFactory>::ProviderRW>>, ) -> Self { - Self { provider, incoming, pruner, metrics: PersistenceMetrics::default(), sync_metrics_tx } + Self { + provider, + incoming, + pruner, + metrics: PersistenceMetrics::default(), + sync_metrics_tx, + custom_stages, + } } /// Prunes block data before the given block number according to the configured prune @@ -67,12 +81,7 @@ where self.metrics.prune_before_duration_seconds.record(start_time.elapsed()); result } -} -impl PersistenceService -where - N: ProviderNodeTypes, -{ /// This is the main loop, that will listen to database events and perform the requested /// database actions pub fn run(mut self) -> Result<(), PersistenceError> { @@ -122,7 +131,7 @@ where } fn on_remove_blocks_above( - &self, + &mut self, new_tip_num: u64, ) -> Result, PersistenceError> { debug!(target: "engine::persistence", ?new_tip_num, "Removing blocks"); @@ -130,6 +139,17 @@ where let provider_rw = self.provider.database_provider_rw()?; let new_tip_hash = provider_rw.block_hash(new_tip_num)?; + + for stage in self.custom_stages.iter_mut().rev() { + if let Some(checkpoint) = provider_rw.get_stage_checkpoint(stage.id())? { + let UnwindOutput { checkpoint } = stage.unwind( + &provider_rw, + UnwindInput { checkpoint, unwind_to: new_tip_num, bad_block: None }, + )?; + provider_rw.save_stage_checkpoint(stage.id(), checkpoint)?; + } + } + provider_rw.remove_block_and_execution_above(new_tip_num)?; provider_rw.commit()?; @@ -139,7 +159,7 @@ where } fn on_save_blocks( - &self, + &mut self, blocks: Vec>, ) -> Result, PersistenceError> { debug!(target: "engine::persistence", first=?blocks.first().map(|b| b.recovered_block.num_hash()), last=?blocks.last().map(|b| b.recovered_block.num_hash()), "Saving range of blocks"); @@ -149,10 +169,28 @@ where number: block.recovered_block().header().number(), }); - if last_block_hash_num.is_some() { + if let Some(num_hash) = last_block_hash_num { let provider_rw = self.provider.database_provider_rw()?; provider_rw.save_blocks(blocks)?; + + for stage in &mut self.custom_stages { + loop { + let checkpoint = provider_rw.get_stage_checkpoint(stage.id())?; + + let ExecOutput { checkpoint, done } = stage.execute( + &provider_rw, + ExecInput { target: Some(num_hash.number), checkpoint }, + )?; + + provider_rw.save_stage_checkpoint(stage.id(), checkpoint)?; + + if done { + break + } + } + } + provider_rw.commit()?; } self.metrics.save_blocks_duration_seconds.record(start_time.elapsed()); @@ -170,6 +208,10 @@ pub enum PersistenceError { /// A provider error #[error(transparent)] ProviderError(#[from] ProviderError), + + /// A stage error + #[error(transparent)] + StageError(#[from] StageError), } /// A signal to the persistence service that part of the tree state can be persisted. @@ -213,6 +255,7 @@ impl PersistenceHandle { provider_factory: ProviderFactory, pruner: PrunerWithFactory>, sync_metrics_tx: MetricEventsSender, + custom_stages: Vec as DatabaseProviderFactory>::ProviderRW>>, ) -> PersistenceHandle where N: ProviderNodeTypes, @@ -224,8 +267,13 @@ impl PersistenceHandle { let persistence_handle = PersistenceHandle::new(db_service_tx); // spawn the persistence service - let db_service = - PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx); + let db_service = PersistenceService::new( + provider_factory, + db_service_rx, + pruner, + sync_metrics_tx, + custom_stages, + ); std::thread::Builder::new() .name("Persistence Service".to_string()) .spawn(|| { @@ -313,7 +361,12 @@ mod tests { Pruner::new_with_factory(provider.clone(), vec![], 5, 0, None, finished_exex_height_rx); let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel(); - PersistenceHandle::::spawn_service(provider, pruner, sync_metrics_tx) + PersistenceHandle::::spawn_service( + provider, + pruner, + sync_metrics_tx, + Default::default(), + ) } #[tokio::test] diff --git a/crates/node/api/Cargo.toml b/crates/node/api/Cargo.toml index 7bd6196318d..0acbc90b170 100644 --- a/crates/node/api/Cargo.toml +++ b/crates/node/api/Cargo.toml @@ -26,6 +26,7 @@ reth-tasks.workspace = true reth-network-api.workspace = true reth-node-types.workspace = true reth-node-core.workspace = true +reth-stages-api.workspace = true reth-tokio-util.workspace = true alloy-rpc-types-engine.workspace = true diff --git a/crates/node/api/src/node.rs b/crates/node/api/src/node.rs index a4a46d2e0ac..80024c64d25 100644 --- a/crates/node/api/src/node.rs +++ b/crates/node/api/src/node.rs @@ -11,7 +11,8 @@ use reth_network_api::FullNetwork; use reth_node_core::node_config::NodeConfig; use reth_node_types::{NodeTypes, NodeTypesWithDBAdapter, TxTy}; use reth_payload_builder::PayloadBuilderHandle; -use reth_provider::FullProvider; +use reth_provider::{DatabaseProviderFactory, FullProvider}; +use reth_stages_api::BoxedStage; use reth_tasks::TaskExecutor; use reth_tokio_util::EventSender; use reth_transaction_pool::{PoolTransaction, TransactionPool}; @@ -203,6 +204,13 @@ pub trait NodeAddOns: Send { self, ctx: AddOnsContext<'_, N>, ) -> impl Future> + Send; + + /// Returns additional stages to be added to the pipeline. + fn extra_stages( + &self, + ) -> Vec::ProviderRW>> { + Vec::new() + } } impl NodeAddOns for () { diff --git a/crates/node/builder/src/launch/engine.rs b/crates/node/builder/src/launch/engine.rs index 02fb505b077..0c35a8dfa85 100644 --- a/crates/node/builder/src/launch/engine.rs +++ b/crates/node/builder/src/launch/engine.rs @@ -151,6 +151,7 @@ impl EngineNodeLauncher { ctx.components().evm_config().clone(), maybe_exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty), ctx.era_import_source(), + add_ons.extra_stages(), )?; // The new engine writes directly to static files. This ensures that they're up to the tip. @@ -232,6 +233,7 @@ impl EngineNodeLauncher { engine_tree_config, ctx.sync_metrics_tx(), ctx.components().evm_config().clone(), + add_ons.extra_stages(), ); info!(target: "reth::cli", "Consensus engine initialized"); diff --git a/crates/node/builder/src/setup.rs b/crates/node/builder/src/setup.rs index a4099691191..e9e5e8ba79a 100644 --- a/crates/node/builder/src/setup.rs +++ b/crates/node/builder/src/setup.rs @@ -16,11 +16,11 @@ use reth_network_p2p::{ bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader, BlockClient, }; use reth_node_api::HeaderTy; -use reth_provider::{providers::ProviderNodeTypes, ProviderFactory}; +use reth_provider::{providers::ProviderNodeTypes, DatabaseProviderFactory, ProviderFactory}; use reth_stages::{ prelude::DefaultStages, stages::{EraImportSource, ExecutionStage}, - Pipeline, StageSet, + BoxedStage, Pipeline, StageId, StageSet, }; use reth_static_file::StaticFileProducer; use reth_tasks::TaskExecutor; @@ -42,6 +42,7 @@ pub fn build_networked_pipeline( evm_config: Evm, exex_manager_handle: ExExManagerHandle, era_import_source: Option, + extra_stages: Vec as DatabaseProviderFactory>::ProviderRW>>, ) -> eyre::Result> where N: ProviderNodeTypes, @@ -70,6 +71,7 @@ where evm_config, exex_manager_handle, era_import_source, + extra_stages, )?; Ok(pipeline) @@ -90,6 +92,7 @@ pub fn build_pipeline( evm_config: Evm, exex_manager_handle: ExExManagerHandle, era_import_source: Option, + extra_stages: Vec as DatabaseProviderFactory>::ProviderRW>>, ) -> eyre::Result> where N: ProviderNodeTypes, @@ -129,7 +132,8 @@ where stage_config.execution.into(), stage_config.execution_external_clean_threshold(), exex_manager_handle, - )), + )) + .add_stages_before(extra_stages, StageId::Finish), ) .build(provider_factory, static_file_producer); diff --git a/crates/stages/api/src/pipeline/mod.rs b/crates/stages/api/src/pipeline/mod.rs index 2446219ea3d..2f2fef37a4e 100644 --- a/crates/stages/api/src/pipeline/mod.rs +++ b/crates/stages/api/src/pipeline/mod.rs @@ -35,7 +35,7 @@ use reth_errors::{ProviderResult, RethResult}; pub use set::*; /// A container for a queued stage. -pub(crate) type BoxedStage = Box>; +pub type BoxedStage = Box>; /// The future that returns the owned pipeline and the result of the pipeline run. See /// [`Pipeline::run_as_fut`]. diff --git a/crates/stages/api/src/pipeline/set.rs b/crates/stages/api/src/pipeline/set.rs index c39dafae99f..daa752a2fc7 100644 --- a/crates/stages/api/src/pipeline/set.rs +++ b/crates/stages/api/src/pipeline/set.rs @@ -172,6 +172,24 @@ impl StageSetBuilder { self } + /// Adds given [`Stage`]s before the stage with the given [`StageId`]. + /// + /// If the stage was already in the group, it is removed from its previous place. + /// + /// # Panics + /// + /// Panics if the dependency stage is not in this set. + pub fn add_stages_before + 'static>( + mut self, + stages: Vec, + before: StageId, + ) -> Self { + for stage in stages { + self = self.add_before(stage, before); + } + self + } + /// Adds the given [`Stage`] after the stage with the given [`StageId`]. /// /// If the stage was already in the group, it is removed from its previous place. diff --git a/crates/stages/stages/src/sets.rs b/crates/stages/stages/src/sets.rs index 97c3a3116aa..3119fec011e 100644 --- a/crates/stages/stages/src/sets.rs +++ b/crates/stages/stages/src/sets.rs @@ -135,30 +135,6 @@ where } } -impl DefaultStages -where - E: ConfigureEvm, - H: HeaderDownloader, - B: BodyDownloader, -{ - /// Appends the default offline stages and default finish stage to the given builder. - pub fn add_offline_stages( - default_offline: StageSetBuilder, - evm_config: E, - consensus: Arc>, - stages_config: StageConfig, - prune_modes: PruneModes, - ) -> StageSetBuilder - where - OfflineStages: StageSet, - { - StageSetBuilder::default() - .add_set(default_offline) - .add_set(OfflineStages::new(evm_config, consensus, stages_config, prune_modes)) - .add_stage(FinishStage) - } -} - impl StageSet for DefaultStages where P: HeaderSyncGapProvider + 'static, @@ -169,13 +145,11 @@ where OfflineStages: StageSet, { fn builder(self) -> StageSetBuilder { - Self::add_offline_stages( - self.online.builder(), - self.evm_config, - self.consensus, - self.stages_config.clone(), - self.prune_modes, - ) + let Self { online, evm_config, consensus, stages_config, prune_modes } = self; + StageSetBuilder::default() + .add_set(online.builder()) + .add_set(OfflineStages::new(evm_config, consensus, stages_config, prune_modes)) + .add_stage(FinishStage) } } diff --git a/crates/stages/stages/src/stages/headers.rs b/crates/stages/stages/src/stages/headers.rs index d3e690dc516..c3ce35f9b0a 100644 --- a/crates/stages/stages/src/stages/headers.rs +++ b/crates/stages/stages/src/stages/headers.rs @@ -25,7 +25,10 @@ use reth_stages_api::{ }; use reth_static_file_types::StaticFileSegment; use reth_storage_errors::provider::ProviderError; -use std::task::{ready, Context, Poll}; +use std::{ + fmt::Debug, + task::{ready, Context, Poll}, +}; use tokio::sync::watch; use tracing::*; @@ -186,7 +189,7 @@ where impl Stage for HeaderStage where - Provider: DBProvider + StaticFileProviderFactory, + Provider: DBProvider + StaticFileProviderFactory + Debug, P: HeaderSyncGapProvider
::BlockHeader>, D: HeaderDownloader
::BlockHeader>, ::BlockHeader: FullBlockHeader + Value,