Skip to content
Merged
Show file tree
Hide file tree
Changes from 115 commits
Commits
Show all changes
150 commits
Select commit Hold shift + click to select a range
436dbd7
feat(storage): introduce storage proof worker pool
yongkangc Oct 7, 2025
fbeec50
fmt, clippy
yongkangc Oct 7, 2025
ab82386
add fallback
yongkangc Oct 7, 2025
13891ad
fix comments
yongkangc Oct 7, 2025
d4e0adb
refactor(metrics): remove unused storage proof metrics from ProofTask…
yongkangc Oct 7, 2025
2957afa
refactor(proof_task): improve documentation and rename variables for …
yongkangc Oct 7, 2025
800dcf6
refactor(proof_task): streamline documentation and clarify task manag…
yongkangc Oct 7, 2025
29d48d4
refactor(config): remove storage proof worker configuration
yongkangc Oct 7, 2025
3fb97c6
refactor(proof_task): enhance comments and adjust queue capacity logic
yongkangc Oct 7, 2025
5779b86
disable max concurrency
yongkangc Oct 7, 2025
0e33837
nits
yongkangc Oct 7, 2025
3bcbc71
Update crates/trie/parallel/src/proof_task.rs
yongkangc Oct 7, 2025
4a67076
Update crates/trie/parallel/src/proof_task.rs
yongkangc Oct 7, 2025
b2d5bcc
using unbounded queue
yongkangc Oct 7, 2025
8f4e3a1
rm comment
yongkangc Oct 7, 2025
b4bf193
refactor(proof_task): optimize storage proof computation by reusing c…
yongkangc Oct 7, 2025
6282d2e
propogate error up
yongkangc Oct 7, 2025
838dc67
reduce scope of pr - exclude all accs
yongkangc Oct 7, 2025
5897945
fmt, clippy
yongkangc Oct 7, 2025
6b5de7c
fmt
yongkangc Oct 7, 2025
05e0eb8
refactor(proof_task): consolidate blinded storage node with storage p…
yongkangc Oct 7, 2025
4829de9
rm comment
yongkangc Oct 7, 2025
6472cfe
simplify worker concurrency
yongkangc Oct 7, 2025
61ecc9a
bump to error!
yongkangc Oct 7, 2025
30f6fda
Update crates/engine/tree/src/tree/payload_processor/mod.rs
yongkangc Oct 7, 2025
4680336
handle sending error back
yongkangc Oct 7, 2025
58d6f8b
fmt
yongkangc Oct 7, 2025
59b0353
fix fmt
yongkangc Oct 7, 2025
93c67e8
Enhance TreeConfig with storage worker count configuration
yongkangc Oct 7, 2025
1954502
update message
yongkangc Oct 7, 2025
2429320
refactor(proof_task): use impl bound
yongkangc Oct 8, 2025
1902b43
make spawning falliable
yongkangc Oct 8, 2025
8fb0dd1
remove error log, as we propogate up
yongkangc Oct 8, 2025
e0010d7
Apply suggestion from @Copilot
yongkangc Oct 8, 2025
53cd4ba
use expect instead of unwrap
yongkangc Oct 8, 2025
e854628
Update crates/trie/parallel/src/proof_task.rs
yongkangc Oct 8, 2025
3b17cc7
Update crates/trie/parallel/src/proof_task.rs
yongkangc Oct 8, 2025
6c89cf4
consolidate
yongkangc Oct 8, 2025
c5f6eb9
removed the unnecessary remaining_concurrency variable allocation
yongkangc Oct 8, 2025
af73c7a
clippy
yongkangc Oct 8, 2025
a8e52bc
Apply suggestion from @yongkangc
yongkangc Oct 8, 2025
c48b328
address brian's pr
yongkangc Oct 8, 2025
3eff3e2
Merge branch 'main' into yk/worker_pool_storage_2
yongkangc Oct 8, 2025
4934099
feat: added initial structs for acc proofs support for task manager
yongkangc Oct 8, 2025
d847799
feat: add account worker count to TreeConfig and update task manager
yongkangc Oct 8, 2025
4c9c2ee
feat: implement account worker functionality in ProofTaskManager
yongkangc Oct 8, 2025
2f10f99
refactor: enhance ProofTaskManager with account worker integration
yongkangc Oct 8, 2025
f5538b2
refactor: slight cleanup
yongkangc Oct 8, 2025
aeff6d2
refactor spawning workers into a generic function
yongkangc Oct 8, 2025
c4c2b4b
refactor: integrate account proof task handling in MultiproofManager
yongkangc Oct 8, 2025
4e30bba
fmt
yongkangc Oct 8, 2025
509898b
refactor: implement account worker loop
yongkangc Oct 8, 2025
7892d54
pass in `missed_leaves_storage_roots`
yongkangc Oct 8, 2025
26b8ca8
refactor: remove db
yongkangc Oct 8, 2025
c48085d
remove db
yongkangc Oct 8, 2025
66bf694
refactor: streamline account multiproof generation
yongkangc Oct 8, 2025
ee16627
refactor: add mapping
yongkangc Oct 8, 2025
7fbd172
refactor: enhance account worker loop and storage proof collection
yongkangc Oct 8, 2025
0884e15
refactor: improve account multiproof processing and storage proof han…
yongkangc Oct 8, 2025
d76cc86
refactor: optimize account multiproof processing and error handling
yongkangc Oct 8, 2025
8e45538
refactor: enhance account worker management and task routing
yongkangc Oct 8, 2025
8be550a
refactor: improve task management in ProofTaskManager
yongkangc Oct 8, 2025
087e875
refactor: remove unused blinded account node retrieval function
yongkangc Oct 8, 2025
0646f14
fmt
yongkangc Oct 8, 2025
886cb6f
changed to error
yongkangc Oct 8, 2025
e51707a
refactor: simplify error handling in proof task management
yongkangc Oct 8, 2025
4941071
rm comment
yongkangc Oct 8, 2025
d6401ab
fix rebase conflict
yongkangc Oct 8, 2025
9fa34b0
fmt
yongkangc Oct 8, 2025
d0e2ba1
removed generic based on brian's suggestion
yongkangc Oct 8, 2025
7efff3d
Update crates/trie/parallel/src/proof_task.rs
yongkangc Oct 8, 2025
f7cd93f
Update crates/trie/parallel/src/proof_task.rs
yongkangc Oct 8, 2025
f823c6b
Refactor error handling in StorageWorkerJob to use a consistent error…
yongkangc Oct 8, 2025
9c08aed
Refactor error handling in StorageWorkerJob to use structured error t…
yongkangc Oct 8, 2025
7f9ec06
fmt, clipy
yongkangc Oct 8, 2025
6d41352
fix
yongkangc Oct 8, 2025
dc5ba15
moved `build_account_multiproof_with_storage_roots` to prooftask.rs
yongkangc Oct 9, 2025
14b6356
refactor: remove generic type parameter from `ProofTaskManagerHandle`
yongkangc Oct 9, 2025
5e1ca83
fmt
yongkangc Oct 9, 2025
3b815f6
refactor: rename storage proof task handle for clarity
yongkangc Oct 9, 2025
7df3488
refactor: simplify `ProofTaskManager` by removing generic type parameter
yongkangc Oct 9, 2025
e418bb1
refactor: improve documentation for proof task handles in `Multiproof…
yongkangc Oct 9, 2025
498cd20
Merge branch 'yk/worker_pool_storage_2' into yk/worker_pool_acc
yongkangc Oct 9, 2025
1bbfc7c
clippy
yongkangc Oct 9, 2025
a654cf8
rm clone from `StorageProofInput`
yongkangc Oct 9, 2025
3c00e98
refactor: use receiver to allow Account worker and storage workers ru…
yongkangc Oct 9, 2025
e427485
Removed the unused _proof_tx parameter from queue_storage_proofs
yongkangc Oct 9, 2025
5647e22
avoid extra alloc on map
yongkangc Oct 9, 2025
b4d109b
clippy
yongkangc Oct 9, 2025
84ad218
refactor: update account worker loop to use crossbeam sender for stor…
yongkangc Oct 9, 2025
9e177f4
feat: add default account worker count for optimized I/O-bound coordi…
yongkangc Oct 9, 2025
4596366
refactor: clarify comments for proof task handles in MultiproofManager
yongkangc Oct 9, 2025
19a9e8b
comment
yongkangc Oct 9, 2025
2091f0d
expect workers to be avaliable
yongkangc Oct 9, 2025
4ca404e
bump up workers
yongkangc Oct 9, 2025
d66ed61
cli flag for storage
yongkangc Oct 9, 2025
4aba3de
docs: update CLI reference for storage-worker-count flag
yongkangc Oct 10, 2025
8e64738
docs: clarify storage-worker-count uses Tokio blocking pool
yongkangc Oct 10, 2025
5d10934
feat: add storage and account worker count configuration options
yongkangc Oct 10, 2025
05f177a
added a todo
yongkangc Oct 10, 2025
aae5c7a
refactor: simplify account multiproof function parameters
yongkangc Oct 10, 2025
1bc2862
move the AccountMultiproofParams struct to just below AccountMultipro…
yongkangc Oct 10, 2025
5a69925
Merge branch 'yk/worker_pool_storage_2' into yk/worker_pool_acc
yongkangc Oct 10, 2025
f23cfab
refactor: addressed dan's comment on making structs that encapsulate…
yongkangc Oct 10, 2025
47398ce
refactor: simplify ParallelProof struct by removing generic Factory type
yongkangc Oct 10, 2025
d5fedd8
removing from test
yongkangc Oct 10, 2025
444b24c
refactor: remove unused Factory type from proof tests
yongkangc Oct 10, 2025
11130c7
refactor: simplify MultiProofConfig and related structures
yongkangc Oct 10, 2025
47da382
clippy
yongkangc Oct 10, 2025
0a88a18
fmt
yongkangc Oct 10, 2025
870938f
Merge branch 'main' into yk/worker_pool_acc
yongkangc Oct 10, 2025
14def07
fmt
yongkangc Oct 10, 2025
44bca43
made count same as storage worker
yongkangc Oct 10, 2025
3516451
refactor: update proof task management to use spawn_proof_workers
yongkangc Oct 10, 2025
ed45ebd
refactor: yeet proof task manager
yongkangc Oct 10, 2025
d44180d
fix comment
yongkangc Oct 10, 2025
0b18f64
feat: introduce minimum worker count configuration
yongkangc Oct 10, 2025
42ceedd
fix: prevent active_handles underflow in ProofTaskManagerHandle
yongkangc Oct 10, 2025
8e00a4a
clippy
yongkangc Oct 10, 2025
2b90133
fmt
yongkangc Oct 10, 2025
3f6400e
refactor: improve error handling in ProofTaskTrieNodeProvider
yongkangc Oct 10, 2025
f302447
fix count
yongkangc Oct 10, 2025
d1eb0ec
Update crates/trie/parallel/src/proof_task.rs
yongkangc Oct 10, 2025
4e00944
merge
yongkangc Oct 10, 2025
833b031
refactor: streamline worker count validation
yongkangc Oct 10, 2025
40bb506
refactor: consolidate proof task handles in MultiproofManager
yongkangc Oct 10, 2025
e49791e
refactor: simplify ProofTaskMetrics default implementation
yongkangc Oct 10, 2025
c02a68d
refactor: improve error handling in trie_node method
yongkangc Oct 10, 2025
2698cc8
refactor: enhance error handling in trie_node method
yongkangc Oct 13, 2025
c68eb7a
refactor: simplify worker count handling in TreeConfig
yongkangc Oct 13, 2025
6a4766c
fix clippy
yongkangc Oct 13, 2025
3e957a5
refactor(tree): remove unused Factory generic from multiproof system …
yongkangc Oct 13, 2025
18ff58d
fix clippy
yongkangc Oct 13, 2025
f692d75
fmt
yongkangc Oct 13, 2025
f9e167e
refactor: replace spawn_proof_workers with ProofTaskManagerHandle
yongkangc Oct 13, 2025
bc4ecf5
addressed sync overhead
yongkangc Oct 13, 2025
45765ac
Merge branch 'yk/worker_pool_acc' into yk/pool_clean
yongkangc Oct 13, 2025
d8c8559
remove wrapper
yongkangc Oct 13, 2025
1e9874b
remove active handle
yongkangc Oct 13, 2025
1619408
fmt
yongkangc Oct 13, 2025
7041e88
Apply suggestion from @yongkangc
yongkangc Oct 14, 2025
8f0aa64
Apply suggestion from @yongkangc
yongkangc Oct 14, 2025
c5b0ec3
Merge branch 'main' into yk/pool_clean
yongkangc Oct 15, 2025
0ba03c7
fix merge conflicts
yongkangc Oct 15, 2025
8971094
rm
yongkangc Oct 15, 2025
f4eda79
import
yongkangc Oct 15, 2025
d6534d9
Apply suggestion from @shekhirin
yongkangc Oct 15, 2025
7117729
Apply suggestion from @yongkangc
yongkangc Oct 15, 2025
ea93b96
fmt
yongkangc Oct 15, 2025
7d17d88
refactor: rename ProofTaskManagerHandle to ProofWorkerHandle for clarity
yongkangc Oct 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 62 additions & 1 deletion crates/engine/primitives/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,36 @@ pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;
/// How close to the canonical head we persist blocks.
pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 0;

/// Default maximum concurrency for proof tasks
/// Default maximum concurrency for on-demand proof tasks (blinded nodes)
pub const DEFAULT_MAX_PROOF_TASK_CONCURRENCY: u64 = 256;

/// Minimum number of workers we allow configuring explicitly.
pub const MIN_WORKER_COUNT: usize = 2;

/// Returns the default number of storage worker threads based on available parallelism.
fn default_storage_worker_count() -> usize {
#[cfg(feature = "std")]
{
std::thread::available_parallelism()
.map(|n| (n.get() * 2).clamp(MIN_WORKER_COUNT, 64))
.unwrap_or(8)
}
#[cfg(not(feature = "std"))]
{
8
}
}

/// Returns the default number of account worker threads optimized for I/O-bound coordination.
///
/// Account workers primarily coordinate storage proof collection and account trie traversal.
/// They spend significant time blocked on `receiver.recv()` calls waiting for storage proofs,
/// so we use higher concurrency (1.5x storage workers) to maximize throughput and overlap.
/// While storage workers are CPU-bound, account workers are I/O-bound coordinators.
fn default_account_worker_count() -> usize {
((default_storage_worker_count() * 3) / 2).max(MIN_WORKER_COUNT)
}

/// The size of proof targets chunk to spawn in one multiproof calculation.
pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE: usize = 10;

Expand Down Expand Up @@ -109,6 +136,10 @@ pub struct TreeConfig {
prewarm_max_concurrency: usize,
/// Whether to unwind canonical header to ancestor during forkchoice updates.
allow_unwind_canonical_header: bool,
/// Number of storage proof worker threads.
storage_worker_count: usize,
/// Number of account proof worker threads.
account_worker_count: usize,
}

impl Default for TreeConfig {
Expand All @@ -135,6 +166,8 @@ impl Default for TreeConfig {
always_process_payload_attributes_on_canonical_head: false,
prewarm_max_concurrency: DEFAULT_PREWARM_MAX_CONCURRENCY,
allow_unwind_canonical_header: false,
storage_worker_count: default_storage_worker_count(),
account_worker_count: default_account_worker_count(),
}
}
}
Expand Down Expand Up @@ -164,7 +197,10 @@ impl TreeConfig {
always_process_payload_attributes_on_canonical_head: bool,
prewarm_max_concurrency: usize,
allow_unwind_canonical_header: bool,
storage_worker_count: usize,
account_worker_count: usize,
) -> Self {
assert!(max_proof_task_concurrency > 0, "max_proof_task_concurrency must be at least 1");
Self {
persistence_threshold,
memory_block_buffer_target,
Expand All @@ -187,6 +223,8 @@ impl TreeConfig {
always_process_payload_attributes_on_canonical_head,
prewarm_max_concurrency,
allow_unwind_canonical_header,
storage_worker_count,
account_worker_count,
}
}

Expand Down Expand Up @@ -394,6 +432,7 @@ impl TreeConfig {
mut self,
max_proof_task_concurrency: u64,
) -> Self {
assert!(max_proof_task_concurrency > 0, "max_proof_task_concurrency must be at least 1");
self.max_proof_task_concurrency = max_proof_task_concurrency;
self
}
Expand Down Expand Up @@ -452,4 +491,26 @@ impl TreeConfig {
pub const fn prewarm_max_concurrency(&self) -> usize {
self.prewarm_max_concurrency
}

/// Return the number of storage proof worker threads.
pub const fn storage_worker_count(&self) -> usize {
self.storage_worker_count
}

/// Setter for the number of storage proof worker threads.
pub const fn with_storage_worker_count(mut self, storage_worker_count: usize) -> Self {
self.storage_worker_count = storage_worker_count.max(MIN_WORKER_COUNT);
self
}

/// Return the number of account proof worker threads.
pub const fn account_worker_count(&self) -> usize {
self.account_worker_count
}

/// Setter for the number of account proof worker threads.
pub const fn with_account_worker_count(mut self, account_worker_count: usize) -> Self {
self.account_worker_count = account_worker_count.max(MIN_WORKER_COUNT);
self
}
}
26 changes: 16 additions & 10 deletions crates/engine/tree/benches/state_root_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,16 +228,22 @@ fn bench_state_root(c: &mut Criterion) {
},
|(genesis_hash, mut payload_processor, provider, state_updates)| {
black_box({
let mut handle = payload_processor.spawn(
Default::default(),
core::iter::empty::<
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
>(),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
ConsistentDbView::new_with_latest_tip(provider).unwrap(),
TrieInput::default(),
&TreeConfig::default(),
);
let mut handle = payload_processor
.spawn(
Default::default(),
core::iter::empty::<
Result<
Recovered<TransactionSigned>,
core::convert::Infallible,
>,
>(),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
ConsistentDbView::new_with_latest_tip(provider).unwrap(),
TrieInput::default(),
&TreeConfig::default(),
)
.map_err(|(err, ..)| err)
.expect("failed to spawn payload processor");

let mut state_hook = handle.state_hook();

Expand Down
73 changes: 40 additions & 33 deletions crates/engine/tree/src/tree/payload_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use reth_provider::{
use reth_revm::{db::BundleState, state::EvmState};
use reth_trie::TrieInput;
use reth_trie_parallel::{
proof_task::{ProofTaskCtx, ProofTaskManager},
proof_task::{spawn_proof_workers, ProofTaskCtx},
root::ParallelStateRootError,
};
use reth_trie_sparse::{
Expand All @@ -45,7 +45,7 @@ use std::sync::{
mpsc::{self, channel, Sender},
Arc,
};
use tracing::{debug, instrument};
use tracing::{debug, instrument, warn};

mod configured_sparse_trie;
pub mod executor;
Expand Down Expand Up @@ -166,6 +166,9 @@ where
///
/// This returns a handle to await the final state root and to interact with the tasks (e.g.
/// canceling)
///
/// Returns an error with the original transactions iterator if proof worker spawning fails.
#[allow(clippy::type_complexity)]
pub fn spawn<P, I: ExecutableTxIterator<Evm>>(
&mut self,
env: ExecutionEnv<Evm>,
Expand All @@ -174,7 +177,10 @@ where
consistent_view: ConsistentDbView<P>,
trie_input: TrieInput,
config: &TreeConfig,
) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
) -> Result<
PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>,
(reth_provider::ProviderError, I, ExecutionEnv<Evm>, StateProviderBuilder<N, P>),
>
where
P: DatabaseProviderFactory<Provider: BlockReader>
+ BlockReader
Expand All @@ -185,8 +191,7 @@ where
{
let (to_sparse_trie, sparse_trie_rx) = channel();
// spawn multiproof task, save the trie input
let (trie_input, state_root_config) =
MultiProofConfig::new_from_input(consistent_view, trie_input);
let (trie_input, state_root_config) = MultiProofConfig::from_input(trie_input);
self.trie_input = Some(trie_input);

// Create and spawn the storage proof task
Expand All @@ -195,21 +200,29 @@ where
state_root_config.state_sorted.clone(),
state_root_config.prefix_sets.clone(),
);
let storage_worker_count = config.storage_worker_count();
let account_worker_count = config.account_worker_count();
let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
let proof_task = ProofTaskManager::new(
let proof_handle = match spawn_proof_workers(
self.executor.handle().clone(),
state_root_config.consistent_view.clone(),
consistent_view,
task_ctx,
max_proof_task_concurrency,
);
storage_worker_count,
account_worker_count,
) {
Ok(handle) => handle,
Err(error) => {
return Err((error, transactions, env, provider_builder));
}
};

// We set it to half of the proof task concurrency, because often for each multiproof we
// spawn one Tokio task for the account proof, and one Tokio task for the storage proof.
let max_multi_proof_task_concurrency = max_proof_task_concurrency / 2;
let multi_proof_task = MultiProofTask::new(
state_root_config,
self.executor.clone(),
proof_task.handle(),
proof_handle.clone(),
to_sparse_trie,
max_multi_proof_task_concurrency,
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
Expand Down Expand Up @@ -238,26 +251,14 @@ where
let (state_root_tx, state_root_rx) = channel();

// Spawn the sparse trie task using any stored trie and parallel trie configuration.
self.spawn_sparse_trie_task(sparse_trie_rx, proof_task.handle(), state_root_tx);
self.spawn_sparse_trie_task(sparse_trie_rx, proof_handle, state_root_tx);

// spawn the proof task
self.executor.spawn_blocking(move || {
if let Err(err) = proof_task.run() {
// At least log if there is an error at any point
tracing::error!(
target: "engine::root",
?err,
"Storage proof task returned an error"
);
}
});

PayloadHandle {
Ok(PayloadHandle {
to_multi_proof,
prewarm_handle,
state_root: Some(state_root_rx),
transactions: execution_rx,
}
})
}

/// Spawns a task that exclusively handles cache prewarming for transaction execution.
Expand Down Expand Up @@ -857,14 +858,20 @@ mod tests {
PrecompileCacheMap::default(),
);
let provider = BlockchainProvider::new(factory).unwrap();
let mut handle = payload_processor.spawn(
Default::default(),
core::iter::empty::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>(),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
ConsistentDbView::new_with_latest_tip(provider).unwrap(),
TrieInput::from_state(hashed_state),
&TreeConfig::default(),
);
let mut handle =
payload_processor
.spawn(
Default::default(),
core::iter::empty::<
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
>(),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
ConsistentDbView::new_with_latest_tip(provider).unwrap(),
TrieInput::from_state(hashed_state),
&TreeConfig::default(),
)
.map_err(|(err, ..)| err)
.expect("failed to spawn payload processor");

let mut state_hook = handle.state_hook();

Expand Down
Loading
Loading