Skip to content
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
436dbd7
feat(storage): introduce storage proof worker pool
yongkangc Oct 7, 2025
fbeec50
fmt, clippy
yongkangc Oct 7, 2025
ab82386
add fallback
yongkangc Oct 7, 2025
13891ad
fix comments
yongkangc Oct 7, 2025
d4e0adb
refactor(metrics): remove unused storage proof metrics from ProofTask…
yongkangc Oct 7, 2025
2957afa
refactor(proof_task): improve documentation and rename variables for …
yongkangc Oct 7, 2025
800dcf6
refactor(proof_task): streamline documentation and clarify task manag…
yongkangc Oct 7, 2025
29d48d4
refactor(config): remove storage proof worker configuration
yongkangc Oct 7, 2025
3fb97c6
refactor(proof_task): enhance comments and adjust queue capacity logic
yongkangc Oct 7, 2025
5779b86
disable max concurrency
yongkangc Oct 7, 2025
0e33837
nits
yongkangc Oct 7, 2025
3bcbc71
Update crates/trie/parallel/src/proof_task.rs
yongkangc Oct 7, 2025
4a67076
Update crates/trie/parallel/src/proof_task.rs
yongkangc Oct 7, 2025
b2d5bcc
using unbounded queue
yongkangc Oct 7, 2025
8f4e3a1
rm comment
yongkangc Oct 7, 2025
b4bf193
refactor(proof_task): optimize storage proof computation by reusing c…
yongkangc Oct 7, 2025
6282d2e
propogate error up
yongkangc Oct 7, 2025
838dc67
reduce scope of pr - exclude all accs
yongkangc Oct 7, 2025
5897945
fmt, clippy
yongkangc Oct 7, 2025
6b5de7c
fmt
yongkangc Oct 7, 2025
05e0eb8
refactor(proof_task): consolidate blinded storage node with storage p…
yongkangc Oct 7, 2025
4829de9
rm comment
yongkangc Oct 7, 2025
6472cfe
simplify worker concurrency
yongkangc Oct 7, 2025
61ecc9a
bump to error!
yongkangc Oct 7, 2025
30f6fda
Update crates/engine/tree/src/tree/payload_processor/mod.rs
yongkangc Oct 7, 2025
4680336
handle sending error back
yongkangc Oct 7, 2025
58d6f8b
fmt
yongkangc Oct 7, 2025
59b0353
fix fmt
yongkangc Oct 7, 2025
93c67e8
Enhance TreeConfig with storage worker count configuration
yongkangc Oct 7, 2025
1954502
update message
yongkangc Oct 7, 2025
2429320
refactor(proof_task): use impl bound
yongkangc Oct 8, 2025
1902b43
make spawning falliable
yongkangc Oct 8, 2025
8fb0dd1
remove error log, as we propogate up
yongkangc Oct 8, 2025
e0010d7
Apply suggestion from @Copilot
yongkangc Oct 8, 2025
53cd4ba
use expect instead of unwrap
yongkangc Oct 8, 2025
e854628
Update crates/trie/parallel/src/proof_task.rs
yongkangc Oct 8, 2025
3b17cc7
Update crates/trie/parallel/src/proof_task.rs
yongkangc Oct 8, 2025
6c89cf4
consolidate
yongkangc Oct 8, 2025
c5f6eb9
removed the unnecessary remaining_concurrency variable allocation
yongkangc Oct 8, 2025
af73c7a
clippy
yongkangc Oct 8, 2025
a8e52bc
Apply suggestion from @yongkangc
yongkangc Oct 8, 2025
c48b328
address brian's pr
yongkangc Oct 8, 2025
3eff3e2
Merge branch 'main' into yk/worker_pool_storage_2
yongkangc Oct 8, 2025
7efff3d
Update crates/trie/parallel/src/proof_task.rs
yongkangc Oct 8, 2025
f7cd93f
Update crates/trie/parallel/src/proof_task.rs
yongkangc Oct 8, 2025
f823c6b
Refactor error handling in StorageWorkerJob to use a consistent error…
yongkangc Oct 8, 2025
9c08aed
Refactor error handling in StorageWorkerJob to use structured error t…
yongkangc Oct 8, 2025
7f9ec06
fmt, clipy
yongkangc Oct 8, 2025
6d41352
fix
yongkangc Oct 8, 2025
4ca404e
bump up workers
yongkangc Oct 9, 2025
d66ed61
cli flag for storage
yongkangc Oct 9, 2025
4aba3de
docs: update CLI reference for storage-worker-count flag
yongkangc Oct 10, 2025
8e64738
docs: clarify storage-worker-count uses Tokio blocking pool
yongkangc Oct 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 32 additions & 1 deletion crates/engine/primitives/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,22 @@ pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;
/// How close to the canonical head we persist blocks.
pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 0;

/// Default maximum concurrency for proof tasks
/// Default maximum concurrency for on-demand proof tasks (blinded nodes)
pub const DEFAULT_MAX_PROOF_TASK_CONCURRENCY: u64 = 256;

/// Returns the default number of storage worker threads based on available parallelism.
/// Defaults to half of available parallelism, clamped between 2 and 8.
fn default_storage_worker_count() -> usize {
#[cfg(feature = "std")]
{
std::thread::available_parallelism().map(|n| (n.get() / 2).clamp(2, 8)).unwrap_or(4)
}
#[cfg(not(feature = "std"))]
{
4
}
}

/// The size of proof targets chunk to spawn in one multiproof calculation.
pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE: usize = 10;

Expand Down Expand Up @@ -109,6 +122,8 @@ pub struct TreeConfig {
prewarm_max_concurrency: usize,
/// Whether to unwind canonical header to ancestor during forkchoice updates.
allow_unwind_canonical_header: bool,
/// Number of storage proof worker threads.
storage_worker_count: usize,
}

impl Default for TreeConfig {
Expand All @@ -135,6 +150,7 @@ impl Default for TreeConfig {
always_process_payload_attributes_on_canonical_head: false,
prewarm_max_concurrency: DEFAULT_PREWARM_MAX_CONCURRENCY,
allow_unwind_canonical_header: false,
storage_worker_count: default_storage_worker_count(),
}
}
}
Expand Down Expand Up @@ -164,7 +180,9 @@ impl TreeConfig {
always_process_payload_attributes_on_canonical_head: bool,
prewarm_max_concurrency: usize,
allow_unwind_canonical_header: bool,
storage_worker_count: usize,
) -> Self {
assert!(max_proof_task_concurrency > 0, "max_proof_task_concurrency must be at least 1");
Self {
persistence_threshold,
memory_block_buffer_target,
Expand All @@ -187,6 +205,7 @@ impl TreeConfig {
always_process_payload_attributes_on_canonical_head,
prewarm_max_concurrency,
allow_unwind_canonical_header,
storage_worker_count,
}
}

Expand Down Expand Up @@ -394,6 +413,7 @@ impl TreeConfig {
mut self,
max_proof_task_concurrency: u64,
) -> Self {
assert!(max_proof_task_concurrency > 0, "max_proof_task_concurrency must be at least 1");
self.max_proof_task_concurrency = max_proof_task_concurrency;
self
}
Expand Down Expand Up @@ -452,4 +472,15 @@ impl TreeConfig {
pub const fn prewarm_max_concurrency(&self) -> usize {
self.prewarm_max_concurrency
}

/// Return the number of storage proof worker threads.
pub const fn storage_worker_count(&self) -> usize {
self.storage_worker_count
}

/// Setter for the number of storage proof worker threads.
pub const fn with_storage_worker_count(mut self, storage_worker_count: usize) -> Self {
self.storage_worker_count = storage_worker_count;
self
}
}
26 changes: 16 additions & 10 deletions crates/engine/tree/benches/state_root_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,16 +228,22 @@ fn bench_state_root(c: &mut Criterion) {
},
|(genesis_hash, mut payload_processor, provider, state_updates)| {
black_box({
let mut handle = payload_processor.spawn(
Default::default(),
core::iter::empty::<
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
>(),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
ConsistentDbView::new_with_latest_tip(provider).unwrap(),
TrieInput::default(),
&TreeConfig::default(),
);
let mut handle = payload_processor
.spawn(
Default::default(),
core::iter::empty::<
Result<
Recovered<TransactionSigned>,
core::convert::Infallible,
>,
>(),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
ConsistentDbView::new_with_latest_tip(provider).unwrap(),
TrieInput::default(),
&TreeConfig::default(),
)
.map_err(|(err, ..)| err)
.expect("failed to spawn payload processor");

let mut state_hook = handle.state_hook();

Expand Down
59 changes: 45 additions & 14 deletions crates/engine/tree/src/tree/payload_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use std::sync::{
mpsc::{self, channel, Sender},
Arc,
};
use tracing::{debug, instrument};
use tracing::{debug, instrument, warn};

mod configured_sparse_trie;
pub mod executor;
Expand Down Expand Up @@ -166,6 +166,10 @@ where
///
/// This returns a handle to await the final state root and to interact with the tasks (e.g.
/// canceling)
///
/// Returns an error with the original transactions iterator if the proof task manager fails to
/// initialize.
#[allow(clippy::type_complexity)]
pub fn spawn<P, I: ExecutableTxIterator<Evm>>(
&mut self,
env: ExecutionEnv<Evm>,
Expand All @@ -174,7 +178,10 @@ where
consistent_view: ConsistentDbView<P>,
trie_input: TrieInput,
config: &TreeConfig,
) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
) -> Result<
PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>,
(reth_provider::ProviderError, I, ExecutionEnv<Evm>, StateProviderBuilder<N, P>),
>
where
P: DatabaseProviderFactory<Provider: BlockReader>
+ BlockReader
Expand All @@ -196,12 +203,25 @@ where
state_root_config.prefix_sets.clone(),
);
let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
let proof_task = ProofTaskManager::new(
let storage_worker_count = config.storage_worker_count();
let proof_task = match ProofTaskManager::new(
self.executor.handle().clone(),
state_root_config.consistent_view.clone(),
task_ctx,
max_proof_task_concurrency,
);
storage_worker_count,
) {
Ok(task) => task,
Err(error) => {
// Fall back to parallel state root if proof task manager fails to initialize
tracing::error!(
target: "engine::tree",
?error,
"Failed to initialize proof task manager, falling back to parallel state root"
);
return Err((error, transactions, env, provider_builder));
}
};

// We set it to half of the proof task concurrency, because often for each multiproof we
// spawn one Tokio task for the account proof, and one Tokio task for the storage proof.
Expand Down Expand Up @@ -252,12 +272,12 @@ where
}
});

PayloadHandle {
Ok(PayloadHandle {
to_multi_proof,
prewarm_handle,
state_root: Some(state_root_rx),
transactions: execution_rx,
}
})
}

/// Spawns a task that exclusively handles cache prewarming for transaction execution.
Expand Down Expand Up @@ -466,6 +486,11 @@ impl<Tx, Err> PayloadHandle<Tx, Err> {
.map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
}

/// Returns `true` if the handle is connected to a background state root task.
pub const fn supports_state_root(&self) -> bool {
self.state_root.is_some()
}

/// Returns a state hook to be used to send state updates to this task.
///
/// If a multiproof task is spawned the hook will notify it about new states.
Expand Down Expand Up @@ -857,14 +882,20 @@ mod tests {
PrecompileCacheMap::default(),
);
let provider = BlockchainProvider::new(factory).unwrap();
let mut handle = payload_processor.spawn(
Default::default(),
core::iter::empty::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>(),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
ConsistentDbView::new_with_latest_tip(provider).unwrap(),
TrieInput::from_state(hashed_state),
&TreeConfig::default(),
);
let mut handle =
payload_processor
.spawn(
Default::default(),
core::iter::empty::<
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
>(),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
ConsistentDbView::new_with_latest_tip(provider).unwrap(),
TrieInput::from_state(hashed_state),
&TreeConfig::default(),
)
.map_err(|(err, ..)| err)
.expect("failed to spawn payload processor");

let mut state_hook = handle.state_hook();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1236,7 +1236,9 @@ mod tests {
config.consistent_view.clone(),
task_ctx,
1,
);
1,
)
.expect("Failed to create ProofTaskManager");
let channel = channel();

MultiProofTask::new(config, executor, proof_task.handle(), channel.0, 1, None)
Expand Down
42 changes: 31 additions & 11 deletions crates/engine/tree/src/tree/payload_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -877,17 +877,37 @@ where
// too expensive because it requires walking all paths in every proof.
let spawn_start = Instant::now();
let (handle, strategy) = if trie_input.prefix_sets.is_empty() {
(
self.payload_processor.spawn(
env,
txs,
provider_builder,
consistent_view,
trie_input,
&self.config,
),
StateRootStrategy::StateRootTask,
)
match self.payload_processor.spawn(
env,
txs,
provider_builder,
consistent_view,
trie_input,
&self.config,
) {
Ok(handle) => {
// Successfully spawned with state root task support
(handle, StateRootStrategy::StateRootTask)
}
Err((error, txs, env, provider_builder)) => {
// Failed to initialize proof task manager, fallback to parallel state
// root
error!(
target: "engine::tree",
block=?block_num_hash,
?error,
"Failed to initialize proof task manager, falling back to parallel state root"
);
(
self.payload_processor.spawn_cache_exclusive(
env,
txs,
provider_builder,
),
StateRootStrategy::Parallel,
)
}
}
// if prefix sets are not empty, we spawn a task that exclusively handles cache
// prewarming for transaction execution
} else {
Expand Down
1 change: 1 addition & 0 deletions crates/trie/parallel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ derive_more.workspace = true
rayon.workspace = true
itertools.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }
crossbeam-channel.workspace = true

# `metrics` feature
reth-metrics = { workspace = true, optional = true }
Expand Down
3 changes: 2 additions & 1 deletion crates/trie/parallel/src/proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,8 @@ mod tests {
let task_ctx =
ProofTaskCtx::new(Default::default(), Default::default(), Default::default());
let proof_task =
ProofTaskManager::new(rt.handle().clone(), consistent_view.clone(), task_ctx, 1);
ProofTaskManager::new(rt.handle().clone(), consistent_view.clone(), task_ctx, 1, 1)
.unwrap();
let proof_task_handle = proof_task.handle();

// keep the join handle around to make sure it does not return any errors
Expand Down
Loading
Loading