Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions crates/engine/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use reth_node_types::{BlockTy, NodeTypes};
use reth_payload_builder::PayloadBuilderHandle;
use reth_provider::{
providers::{BlockchainProvider, ProviderNodeTypes},
ProviderFactory,
DatabaseProviderFactory, ProviderFactory,
};
use reth_prune::PrunerWithFactory;
use reth_stages_api::{MetricEventsSender, Pipeline};
use reth_stages_api::{BoxedStage, MetricEventsSender, Pipeline};
use reth_tasks::TaskSpawner;
use std::{
pin::Pin,
Expand Down Expand Up @@ -84,6 +84,7 @@ where
tree_config: TreeConfig,
sync_metrics_tx: MetricEventsSender,
evm_config: C,
custom_stages: Vec<BoxedStage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>>,
) -> Self
where
V: EngineValidator<N::Payload>,
Expand All @@ -94,8 +95,12 @@ where

let downloader = BasicBlockDownloader::new(client, consensus.clone());

let persistence_handle =
PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx);
let persistence_handle = PersistenceHandle::<EthPrimitives>::spawn_service(
provider,
pruner,
sync_metrics_tx,
custom_stages,
);

let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();

Expand Down Expand Up @@ -214,6 +219,7 @@ mod tests {
TreeConfig::default(),
sync_metrics_tx,
evm_config,
Default::default(),
);
}
}
83 changes: 68 additions & 15 deletions crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ use reth_ethereum_primitives::EthPrimitives;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter,
DBProvider, DatabaseProviderFactory, ProviderFactory,
DBProvider, DatabaseProviderFactory, ProviderFactory, StageCheckpointReader,
StageCheckpointWriter,
};
use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
use reth_stages_api::{MetricEvent, MetricEventsSender};
use reth_stages_api::{
BoxedStage, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, StageError, UnwindInput,
UnwindOutput,
};
use std::{
sync::mpsc::{Receiver, SendError, Sender},
time::Instant,
Expand All @@ -26,7 +30,7 @@ use tracing::{debug, error};
///
/// This should be spawned in its own thread with [`std::thread::spawn`], since this performs
/// blocking I/O operations in an endless loop.
#[derive(Debug)]
#[expect(missing_debug_implementations)]
pub struct PersistenceService<N>
where
N: ProviderNodeTypes,
Expand All @@ -41,6 +45,8 @@ where
metrics: PersistenceMetrics,
/// Sender for sync metrics - we only submit sync metrics for persisted blocks
sync_metrics_tx: MetricEventsSender,
/// Custom pipeline stages advanced on new blocks.
custom_stages: Vec<BoxedStage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>>,
}

impl<N> PersistenceService<N>
Expand All @@ -53,8 +59,16 @@ where
incoming: Receiver<PersistenceAction<N::Primitives>>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
sync_metrics_tx: MetricEventsSender,
custom_stages: Vec<BoxedStage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>>,
) -> Self {
Self { provider, incoming, pruner, metrics: PersistenceMetrics::default(), sync_metrics_tx }
Self {
provider,
incoming,
pruner,
metrics: PersistenceMetrics::default(),
sync_metrics_tx,
custom_stages,
}
}

/// Prunes block data before the given block number according to the configured prune
Expand All @@ -67,12 +81,7 @@ where
self.metrics.prune_before_duration_seconds.record(start_time.elapsed());
result
}
}

impl<N> PersistenceService<N>
where
N: ProviderNodeTypes,
{
/// This is the main loop, that will listen to database events and perform the requested
/// database actions
pub fn run(mut self) -> Result<(), PersistenceError> {
Expand Down Expand Up @@ -122,14 +131,25 @@ where
}

fn on_remove_blocks_above(
&self,
&mut self,
new_tip_num: u64,
) -> Result<Option<BlockNumHash>, PersistenceError> {
debug!(target: "engine::persistence", ?new_tip_num, "Removing blocks");
let start_time = Instant::now();
let provider_rw = self.provider.database_provider_rw()?;

let new_tip_hash = provider_rw.block_hash(new_tip_num)?;

for stage in self.custom_stages.iter_mut().rev() {
if let Some(checkpoint) = provider_rw.get_stage_checkpoint(stage.id())? {
let UnwindOutput { checkpoint } = stage.unwind(
&provider_rw,
UnwindInput { checkpoint, unwind_to: new_tip_num, bad_block: None },
)?;
provider_rw.save_stage_checkpoint(stage.id(), checkpoint)?;
}
}

provider_rw.remove_block_and_execution_above(new_tip_num)?;
provider_rw.commit()?;

Expand All @@ -139,7 +159,7 @@ where
}

fn on_save_blocks(
&self,
&mut self,
blocks: Vec<ExecutedBlockWithTrieUpdates<N::Primitives>>,
) -> Result<Option<BlockNumHash>, PersistenceError> {
debug!(target: "engine::persistence", first=?blocks.first().map(|b| b.recovered_block.num_hash()), last=?blocks.last().map(|b| b.recovered_block.num_hash()), "Saving range of blocks");
Expand All @@ -149,10 +169,28 @@ where
number: block.recovered_block().header().number(),
});

if last_block_hash_num.is_some() {
if let Some(num_hash) = last_block_hash_num {
let provider_rw = self.provider.database_provider_rw()?;

provider_rw.save_blocks(blocks)?;

for stage in &mut self.custom_stages {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we document here that we need invoke those after we save the blocks, because we want them to run last

loop {
let checkpoint = provider_rw.get_stage_checkpoint(stage.id())?;

let ExecOutput { checkpoint, done } = stage.execute(
&provider_rw,
ExecInput { target: Some(num_hash.number), checkpoint },
)?;

provider_rw.save_stage_checkpoint(stage.id(), checkpoint)?;

if done {
break
}
}
}

provider_rw.commit()?;
}
self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
Expand All @@ -170,6 +208,10 @@ pub enum PersistenceError {
/// A provider error
#[error(transparent)]
ProviderError(#[from] ProviderError),

/// A stage error
#[error(transparent)]
StageError(#[from] StageError),
}

/// A signal to the persistence service that part of the tree state can be persisted.
Expand Down Expand Up @@ -213,6 +255,7 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
provider_factory: ProviderFactory<N>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
sync_metrics_tx: MetricEventsSender,
custom_stages: Vec<BoxedStage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>>,
) -> PersistenceHandle<N::Primitives>
where
N: ProviderNodeTypes,
Expand All @@ -224,8 +267,13 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
let persistence_handle = PersistenceHandle::new(db_service_tx);

// spawn the persistence service
let db_service =
PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx);
let db_service = PersistenceService::new(
provider_factory,
db_service_rx,
pruner,
sync_metrics_tx,
custom_stages,
);
std::thread::Builder::new()
.name("Persistence Service".to_string())
.spawn(|| {
Expand Down Expand Up @@ -313,7 +361,12 @@ mod tests {
Pruner::new_with_factory(provider.clone(), vec![], 5, 0, None, finished_exex_height_rx);

let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx)
PersistenceHandle::<EthPrimitives>::spawn_service(
provider,
pruner,
sync_metrics_tx,
Default::default(),
)
}

#[tokio::test]
Expand Down
1 change: 1 addition & 0 deletions crates/node/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ reth-tasks.workspace = true
reth-network-api.workspace = true
reth-node-types.workspace = true
reth-node-core.workspace = true
reth-stages-api.workspace = true
reth-tokio-util.workspace = true

alloy-rpc-types-engine.workspace = true
Expand Down
10 changes: 9 additions & 1 deletion crates/node/api/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use reth_network_api::FullNetwork;
use reth_node_core::node_config::NodeConfig;
use reth_node_types::{NodeTypes, NodeTypesWithDBAdapter, TxTy};
use reth_payload_builder::PayloadBuilderHandle;
use reth_provider::FullProvider;
use reth_provider::{DatabaseProviderFactory, FullProvider};
use reth_stages_api::BoxedStage;
use reth_tasks::TaskExecutor;
use reth_tokio_util::EventSender;
use reth_transaction_pool::{PoolTransaction, TransactionPool};
Expand Down Expand Up @@ -203,6 +204,13 @@ pub trait NodeAddOns<N: FullNodeComponents>: Send {
self,
ctx: AddOnsContext<'_, N>,
) -> impl Future<Output = eyre::Result<Self::Handle>> + Send;

/// Returns additional stages to be added to the pipeline.
fn extra_stages(
&self,
) -> Vec<BoxedStage<<N::Provider as DatabaseProviderFactory>::ProviderRW>> {
Vec::new()
}
Comment on lines +208 to +213
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes sense here I believe

}

impl<N: FullNodeComponents> NodeAddOns<N> for () {
Expand Down
2 changes: 2 additions & 0 deletions crates/node/builder/src/launch/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ impl EngineNodeLauncher {
ctx.components().evm_config().clone(),
maybe_exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty),
ctx.era_import_source(),
add_ons.extra_stages(),
)?;

// The new engine writes directly to static files. This ensures that they're up to the tip.
Expand Down Expand Up @@ -232,6 +233,7 @@ impl EngineNodeLauncher {
engine_tree_config,
ctx.sync_metrics_tx(),
ctx.components().evm_config().clone(),
add_ons.extra_stages(),
);

info!(target: "reth::cli", "Consensus engine initialized");
Expand Down
10 changes: 7 additions & 3 deletions crates/node/builder/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ use reth_network_p2p::{
bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader, BlockClient,
};
use reth_node_api::HeaderTy;
use reth_provider::{providers::ProviderNodeTypes, ProviderFactory};
use reth_provider::{providers::ProviderNodeTypes, DatabaseProviderFactory, ProviderFactory};
use reth_stages::{
prelude::DefaultStages,
stages::{EraImportSource, ExecutionStage},
Pipeline, StageSet,
BoxedStage, Pipeline, StageId, StageSet,
};
use reth_static_file::StaticFileProducer;
use reth_tasks::TaskExecutor;
Expand All @@ -42,6 +42,7 @@ pub fn build_networked_pipeline<N, Client, Evm>(
evm_config: Evm,
exex_manager_handle: ExExManagerHandle<N::Primitives>,
era_import_source: Option<EraImportSource>,
extra_stages: Vec<BoxedStage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>>,
) -> eyre::Result<Pipeline<N>>
where
N: ProviderNodeTypes,
Expand Down Expand Up @@ -70,6 +71,7 @@ where
evm_config,
exex_manager_handle,
era_import_source,
extra_stages,
)?;

Ok(pipeline)
Expand All @@ -90,6 +92,7 @@ pub fn build_pipeline<N, H, B, Evm>(
evm_config: Evm,
exex_manager_handle: ExExManagerHandle<N::Primitives>,
era_import_source: Option<EraImportSource>,
extra_stages: Vec<BoxedStage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>>,
) -> eyre::Result<Pipeline<N>>
where
N: ProviderNodeTypes,
Expand Down Expand Up @@ -129,7 +132,8 @@ where
stage_config.execution.into(),
stage_config.execution_external_clean_threshold(),
exex_manager_handle,
)),
))
.add_stages_before(extra_stages, StageId::Finish),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're fine if these are put before finish stage

)
.build(provider_factory, static_file_producer);

Expand Down
2 changes: 1 addition & 1 deletion crates/stages/api/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use reth_errors::{ProviderResult, RethResult};
pub use set::*;

/// A container for a queued stage.
pub(crate) type BoxedStage<DB> = Box<dyn Stage<DB>>;
pub type BoxedStage<Provider> = Box<dyn Stage<Provider>>;

/// The future that returns the owned pipeline and the result of the pipeline run. See
/// [`Pipeline::run_as_fut`].
Expand Down
18 changes: 18 additions & 0 deletions crates/stages/api/src/pipeline/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,24 @@ impl<Provider> StageSetBuilder<Provider> {
self
}

/// Adds given [`Stage`]s before the stage with the given [`StageId`].
///
/// If the stage was already in the group, it is removed from its previous place.
///
/// # Panics
///
/// Panics if the dependency stage is not in this set.
pub fn add_stages_before<S: Stage<Provider> + 'static>(
mut self,
stages: Vec<S>,
before: StageId,
) -> Self {
for stage in stages {
self = self.add_before(stage, before);
}
self
}

/// Adds the given [`Stage`] after the stage with the given [`StageId`].
///
/// If the stage was already in the group, it is removed from its previous place.
Expand Down
Loading
Loading