Skip to content
Merged
Show file tree
Hide file tree
Changes from 91 commits
Commits
Show all changes
112 commits
Select commit Hold shift + click to select a range
436dbd7
feat(storage): introduce storage proof worker pool
yongkangc Oct 7, 2025
fbeec50
fmt, clippy
yongkangc Oct 7, 2025
ab82386
add fallback
yongkangc Oct 7, 2025
13891ad
fix comments
yongkangc Oct 7, 2025
d4e0adb
refactor(metrics): remove unused storage proof metrics from ProofTask…
yongkangc Oct 7, 2025
2957afa
refactor(proof_task): improve documentation and rename variables for …
yongkangc Oct 7, 2025
800dcf6
refactor(proof_task): streamline documentation and clarify task manag…
yongkangc Oct 7, 2025
29d48d4
refactor(config): remove storage proof worker configuration
yongkangc Oct 7, 2025
3fb97c6
refactor(proof_task): enhance comments and adjust queue capacity logic
yongkangc Oct 7, 2025
5779b86
disable max concurrency
yongkangc Oct 7, 2025
0e33837
nits
yongkangc Oct 7, 2025
3bcbc71
Update crates/trie/parallel/src/proof_task.rs
yongkangc Oct 7, 2025
4a67076
Update crates/trie/parallel/src/proof_task.rs
yongkangc Oct 7, 2025
b2d5bcc
using unbounded queue
yongkangc Oct 7, 2025
8f4e3a1
rm comment
yongkangc Oct 7, 2025
b4bf193
refactor(proof_task): optimize storage proof computation by reusing c…
yongkangc Oct 7, 2025
6282d2e
propogate error up
yongkangc Oct 7, 2025
838dc67
reduce scope of pr - exclude all accs
yongkangc Oct 7, 2025
5897945
fmt, clippy
yongkangc Oct 7, 2025
6b5de7c
fmt
yongkangc Oct 7, 2025
05e0eb8
refactor(proof_task): consolidate blinded storage node with storage p…
yongkangc Oct 7, 2025
4829de9
rm comment
yongkangc Oct 7, 2025
6472cfe
simplify worker concurrency
yongkangc Oct 7, 2025
61ecc9a
bump to error!
yongkangc Oct 7, 2025
30f6fda
Update crates/engine/tree/src/tree/payload_processor/mod.rs
yongkangc Oct 7, 2025
4680336
handle sending error back
yongkangc Oct 7, 2025
58d6f8b
fmt
yongkangc Oct 7, 2025
59b0353
fix fmt
yongkangc Oct 7, 2025
93c67e8
Enhance TreeConfig with storage worker count configuration
yongkangc Oct 7, 2025
1954502
update message
yongkangc Oct 7, 2025
2429320
refactor(proof_task): use impl bound
yongkangc Oct 8, 2025
1902b43
make spawning falliable
yongkangc Oct 8, 2025
8fb0dd1
remove error log, as we propogate up
yongkangc Oct 8, 2025
e0010d7
Apply suggestion from @Copilot
yongkangc Oct 8, 2025
53cd4ba
use expect instead of unwrap
yongkangc Oct 8, 2025
e854628
Update crates/trie/parallel/src/proof_task.rs
yongkangc Oct 8, 2025
3b17cc7
Update crates/trie/parallel/src/proof_task.rs
yongkangc Oct 8, 2025
6c89cf4
consolidate
yongkangc Oct 8, 2025
c5f6eb9
removed the unnecessary remaining_concurrency variable allocation
yongkangc Oct 8, 2025
af73c7a
clippy
yongkangc Oct 8, 2025
a8e52bc
Apply suggestion from @yongkangc
yongkangc Oct 8, 2025
c48b328
address brian's pr
yongkangc Oct 8, 2025
3eff3e2
Merge branch 'main' into yk/worker_pool_storage_2
yongkangc Oct 8, 2025
4934099
feat: added initial structs for acc proofs support for task manager
yongkangc Oct 8, 2025
d847799
feat: add account worker count to TreeConfig and update task manager
yongkangc Oct 8, 2025
4c9c2ee
feat: implement account worker functionality in ProofTaskManager
yongkangc Oct 8, 2025
2f10f99
refactor: enhance ProofTaskManager with account worker integration
yongkangc Oct 8, 2025
f5538b2
refactor: slight cleanup
yongkangc Oct 8, 2025
aeff6d2
refactor spawning workers into a generic function
yongkangc Oct 8, 2025
c4c2b4b
refactor: integrate account proof task handling in MultiproofManager
yongkangc Oct 8, 2025
4e30bba
fmt
yongkangc Oct 8, 2025
509898b
refactor: implement account worker loop
yongkangc Oct 8, 2025
7892d54
pass in `missed_leaves_storage_roots`
yongkangc Oct 8, 2025
26b8ca8
refactor: remove db
yongkangc Oct 8, 2025
c48085d
remove db
yongkangc Oct 8, 2025
66bf694
refactor: streamline account multiproof generation
yongkangc Oct 8, 2025
ee16627
refactor: add mapping
yongkangc Oct 8, 2025
7fbd172
refactor: enhance account worker loop and storage proof collection
yongkangc Oct 8, 2025
0884e15
refactor: improve account multiproof processing and storage proof han…
yongkangc Oct 8, 2025
d76cc86
refactor: optimize account multiproof processing and error handling
yongkangc Oct 8, 2025
8e45538
refactor: enhance account worker management and task routing
yongkangc Oct 8, 2025
8be550a
refactor: improve task management in ProofTaskManager
yongkangc Oct 8, 2025
087e875
refactor: remove unused blinded account node retrieval function
yongkangc Oct 8, 2025
0646f14
fmt
yongkangc Oct 8, 2025
886cb6f
changed to error
yongkangc Oct 8, 2025
e51707a
refactor: simplify error handling in proof task management
yongkangc Oct 8, 2025
4941071
rm comment
yongkangc Oct 8, 2025
d6401ab
fix rebase conflict
yongkangc Oct 8, 2025
9fa34b0
fmt
yongkangc Oct 8, 2025
d0e2ba1
removed generic based on brian's suggestion
yongkangc Oct 8, 2025
7efff3d
Update crates/trie/parallel/src/proof_task.rs
yongkangc Oct 8, 2025
f7cd93f
Update crates/trie/parallel/src/proof_task.rs
yongkangc Oct 8, 2025
f823c6b
Refactor error handling in StorageWorkerJob to use a consistent error…
yongkangc Oct 8, 2025
9c08aed
Refactor error handling in StorageWorkerJob to use structured error t…
yongkangc Oct 8, 2025
7f9ec06
fmt, clipy
yongkangc Oct 8, 2025
6d41352
fix
yongkangc Oct 8, 2025
dc5ba15
moved `build_account_multiproof_with_storage_roots` to prooftask.rs
yongkangc Oct 9, 2025
14b6356
refactor: remove generic type parameter from `ProofTaskManagerHandle`
yongkangc Oct 9, 2025
5e1ca83
fmt
yongkangc Oct 9, 2025
3b815f6
refactor: rename storage proof task handle for clarity
yongkangc Oct 9, 2025
7df3488
refactor: simplify `ProofTaskManager` by removing generic type parameter
yongkangc Oct 9, 2025
e418bb1
refactor: improve documentation for proof task handles in `Multiproof…
yongkangc Oct 9, 2025
498cd20
Merge branch 'yk/worker_pool_storage_2' into yk/worker_pool_acc
yongkangc Oct 9, 2025
1bbfc7c
clippy
yongkangc Oct 9, 2025
a654cf8
rm clone from `StorageProofInput`
yongkangc Oct 9, 2025
3c00e98
refactor: use receiver to allow Account worker and storage workers ru…
yongkangc Oct 9, 2025
e427485
Removed the unused _proof_tx parameter from queue_storage_proofs
yongkangc Oct 9, 2025
5647e22
avoid extra alloc on map
yongkangc Oct 9, 2025
b4d109b
clippy
yongkangc Oct 9, 2025
84ad218
refactor: update account worker loop to use crossbeam sender for stor…
yongkangc Oct 9, 2025
9e177f4
feat: add default account worker count for optimized I/O-bound coordi…
yongkangc Oct 9, 2025
4596366
refactor: clarify comments for proof task handles in MultiproofManager
yongkangc Oct 9, 2025
19a9e8b
comment
yongkangc Oct 9, 2025
2091f0d
expect workers to be avaliable
yongkangc Oct 9, 2025
4ca404e
bump up workers
yongkangc Oct 9, 2025
d66ed61
cli flag for storage
yongkangc Oct 9, 2025
4aba3de
docs: update CLI reference for storage-worker-count flag
yongkangc Oct 10, 2025
8e64738
docs: clarify storage-worker-count uses Tokio blocking pool
yongkangc Oct 10, 2025
5d10934
feat: add storage and account worker count configuration options
yongkangc Oct 10, 2025
05f177a
added a todo
yongkangc Oct 10, 2025
aae5c7a
refactor: simplify account multiproof function parameters
yongkangc Oct 10, 2025
1bc2862
move the AccountMultiproofParams struct to just below AccountMultipro…
yongkangc Oct 10, 2025
5a69925
Merge branch 'yk/worker_pool_storage_2' into yk/worker_pool_acc
yongkangc Oct 10, 2025
f23cfab
refactor: addressed dan's comment on making structs that encapsulate…
yongkangc Oct 10, 2025
870938f
Merge branch 'main' into yk/worker_pool_acc
yongkangc Oct 10, 2025
14def07
fmt
yongkangc Oct 10, 2025
44bca43
made count same as storage worker
yongkangc Oct 10, 2025
d1eb0ec
Update crates/trie/parallel/src/proof_task.rs
yongkangc Oct 10, 2025
4e00944
merge
yongkangc Oct 10, 2025
3e957a5
refactor(tree): remove unused Factory generic from multiproof system …
yongkangc Oct 13, 2025
18ff58d
fix clippy
yongkangc Oct 13, 2025
f692d75
fmt
yongkangc Oct 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 58 additions & 1 deletion crates/engine/primitives/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,32 @@ pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;
/// How close to the canonical head we persist blocks.
pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 0;

/// Default maximum concurrency for proof tasks
/// Default maximum concurrency for on-demand proof tasks (blinded nodes)
pub const DEFAULT_MAX_PROOF_TASK_CONCURRENCY: u64 = 256;

/// Returns the default number of storage worker threads based on available parallelism.
/// Defaults to half of available parallelism, clamped between 2 and 8.
fn default_storage_worker_count() -> usize {
#[cfg(feature = "std")]
{
std::thread::available_parallelism().map(|n| (n.get() / 2).clamp(2, 8)).unwrap_or(4)
}
#[cfg(not(feature = "std"))]
{
4
}
}

/// Returns the default number of account worker threads optimized for I/O-bound coordination.
///
/// Account workers primarily coordinate storage proof collection and account trie traversal.
/// They spend significant time blocked on `receiver.recv()` calls waiting for storage proofs,
/// so we use higher concurrency (1.5x storage workers) to maximize throughput and overlap.
/// While storage workers are CPU-bound, account workers are I/O-bound coordinators.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this comment true? I would expect the storage workers to be I/O bound as well

Copy link
Member Author

@yongkangc yongkangc Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Rjected looking at storage workers:

implementation:

pub fn storage_multiproof(

storage worker does:

  1. Database I/O: Creates cursors and walks the storage trie using TrieNodeIter (reads trie nodes from database)
  2. CPU Work: For each node, calls hash_builder.add_branch() or hash_builder.add_leaf() (cryptographic hashing operations)
  3. CPU Work: Computes final root hash with hash_builder.root() (more cryptographic hashing)

so imo storage workers does perform database I/O (reading trie nodes), tho it seems the bottleneck is the CPU-intensive Merkle proof computation, not the I/O.

wdyt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this for simplicity, and made the count the same as storage worker

fn default_account_worker_count() -> usize {
(default_storage_worker_count() * 3) / 2
}

/// The size of proof targets chunk to spawn in one multiproof calculation.
pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE: usize = 10;

Expand Down Expand Up @@ -109,6 +132,10 @@ pub struct TreeConfig {
prewarm_max_concurrency: usize,
/// Whether to unwind canonical header to ancestor during forkchoice updates.
allow_unwind_canonical_header: bool,
/// Number of storage proof worker threads.
storage_worker_count: usize,
/// Number of account proof worker threads.
account_worker_count: usize,
}

impl Default for TreeConfig {
Expand All @@ -135,6 +162,8 @@ impl Default for TreeConfig {
always_process_payload_attributes_on_canonical_head: false,
prewarm_max_concurrency: DEFAULT_PREWARM_MAX_CONCURRENCY,
allow_unwind_canonical_header: false,
storage_worker_count: default_storage_worker_count(),
account_worker_count: default_account_worker_count(),
}
}
}
Expand Down Expand Up @@ -164,7 +193,10 @@ impl TreeConfig {
always_process_payload_attributes_on_canonical_head: bool,
prewarm_max_concurrency: usize,
allow_unwind_canonical_header: bool,
storage_worker_count: usize,
account_worker_count: usize,
) -> Self {
assert!(max_proof_task_concurrency > 0, "max_proof_task_concurrency must be at least 1");
Self {
persistence_threshold,
memory_block_buffer_target,
Expand All @@ -187,6 +219,8 @@ impl TreeConfig {
always_process_payload_attributes_on_canonical_head,
prewarm_max_concurrency,
allow_unwind_canonical_header,
storage_worker_count,
account_worker_count,
}
}

Expand Down Expand Up @@ -394,6 +428,7 @@ impl TreeConfig {
mut self,
max_proof_task_concurrency: u64,
) -> Self {
assert!(max_proof_task_concurrency > 0, "max_proof_task_concurrency must be at least 1");
self.max_proof_task_concurrency = max_proof_task_concurrency;
self
}
Expand Down Expand Up @@ -452,4 +487,26 @@ impl TreeConfig {
pub const fn prewarm_max_concurrency(&self) -> usize {
self.prewarm_max_concurrency
}

/// Return the number of storage proof worker threads.
pub const fn storage_worker_count(&self) -> usize {
self.storage_worker_count
}

/// Setter for the number of storage proof worker threads.
pub const fn with_storage_worker_count(mut self, storage_worker_count: usize) -> Self {
self.storage_worker_count = storage_worker_count;
self
}

/// Return the number of account proof worker threads.
pub const fn account_worker_count(&self) -> usize {
self.account_worker_count
}

/// Setter for the number of account proof worker threads.
pub const fn with_account_worker_count(mut self, account_worker_count: usize) -> Self {
self.account_worker_count = account_worker_count;
self
}
}
26 changes: 16 additions & 10 deletions crates/engine/tree/benches/state_root_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,16 +228,22 @@ fn bench_state_root(c: &mut Criterion) {
},
|(genesis_hash, mut payload_processor, provider, state_updates)| {
black_box({
let mut handle = payload_processor.spawn(
Default::default(),
core::iter::empty::<
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
>(),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
ConsistentDbView::new_with_latest_tip(provider).unwrap(),
TrieInput::default(),
&TreeConfig::default(),
);
let mut handle = payload_processor
.spawn(
Default::default(),
core::iter::empty::<
Result<
Recovered<TransactionSigned>,
core::convert::Infallible,
>,
>(),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
ConsistentDbView::new_with_latest_tip(provider).unwrap(),
TrieInput::default(),
&TreeConfig::default(),
)
.map_err(|(err, ..)| err)
.expect("failed to spawn payload processor");

let mut state_hook = handle.state_hook();

Expand Down
51 changes: 36 additions & 15 deletions crates/engine/tree/src/tree/payload_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use std::sync::{
mpsc::{self, channel, Sender},
Arc,
};
use tracing::{debug, instrument};
use tracing::{debug, instrument, warn};

mod configured_sparse_trie;
pub mod executor;
Expand Down Expand Up @@ -166,6 +166,10 @@ where
///
/// This returns a handle to await the final state root and to interact with the tasks (e.g.
/// canceling)
///
/// Returns an error with the original transactions iterator if the proof task manager fails to
/// initialize.
#[allow(clippy::type_complexity)]
pub fn spawn<P, I: ExecutableTxIterator<Evm>>(
&mut self,
env: ExecutionEnv<Evm>,
Expand All @@ -174,7 +178,10 @@ where
consistent_view: ConsistentDbView<P>,
trie_input: TrieInput,
config: &TreeConfig,
) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
) -> Result<
PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>,
(reth_provider::ProviderError, I, ExecutionEnv<Evm>, StateProviderBuilder<N, P>),
>
where
P: DatabaseProviderFactory<Provider: BlockReader>
+ BlockReader
Expand All @@ -195,13 +202,21 @@ where
state_root_config.state_sorted.clone(),
state_root_config.prefix_sets.clone(),
);
let storage_worker_count = config.storage_worker_count();
let account_worker_count = config.account_worker_count();
let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
let proof_task = ProofTaskManager::new(
let proof_task = match ProofTaskManager::new(
self.executor.handle().clone(),
state_root_config.consistent_view.clone(),
task_ctx,
max_proof_task_concurrency,
);
storage_worker_count,
account_worker_count,
) {
Ok(task) => task,
Err(error) => {
return Err((error, transactions, env, provider_builder));
}
};

// We set it to half of the proof task concurrency, because often for each multiproof we
// spawn one Tokio task for the account proof, and one Tokio task for the storage proof.
Expand Down Expand Up @@ -252,12 +267,12 @@ where
}
});

PayloadHandle {
Ok(PayloadHandle {
to_multi_proof,
prewarm_handle,
state_root: Some(state_root_rx),
transactions: execution_rx,
}
})
}

/// Spawns a task that exclusively handles cache prewarming for transaction execution.
Expand Down Expand Up @@ -857,14 +872,20 @@ mod tests {
PrecompileCacheMap::default(),
);
let provider = BlockchainProvider::new(factory).unwrap();
let mut handle = payload_processor.spawn(
Default::default(),
core::iter::empty::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>(),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
ConsistentDbView::new_with_latest_tip(provider).unwrap(),
TrieInput::from_state(hashed_state),
&TreeConfig::default(),
);
let mut handle =
payload_processor
.spawn(
Default::default(),
core::iter::empty::<
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
>(),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
ConsistentDbView::new_with_latest_tip(provider).unwrap(),
TrieInput::from_state(hashed_state),
&TreeConfig::default(),
)
.map_err(|(err, ..)| err)
.expect("failed to spawn payload processor");

let mut state_hook = handle.state_hook();

Expand Down
Loading