Skip to content

Commit d2070f4

Browse files
feat: wait for new blocks when build is in progress (#18831)
Co-authored-by: Roman Hodulák <roman.hodulak@polyglot-software.com>
1 parent 6f96a32 commit d2070f4

File tree

5 files changed

+127
-45
lines changed

5 files changed

+127
-45
lines changed

crates/optimism/flashblocks/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ pub use payload::{
44
ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, FlashBlock, FlashBlockDecoder,
55
Metadata,
66
};
7-
pub use service::FlashBlockService;
7+
pub use service::{FlashBlockBuildInfo, FlashBlockService};
88
pub use ws::{WsConnect, WsFlashBlockStream};
99

1010
mod consensus;
@@ -28,3 +28,6 @@ pub type PendingBlockRx<N> = tokio::sync::watch::Receiver<Option<PendingFlashBlo
2828
/// [`FlashBlock`]: crate::FlashBlock
2929
pub type FlashBlockCompleteSequenceRx =
3030
tokio::sync::broadcast::Receiver<FlashBlockCompleteSequence>;
31+
32+
/// Receiver that signals whether a [`FlashBlock`] is currently being built.
33+
pub type InProgressFlashBlockRx = tokio::sync::watch::Receiver<Option<FlashBlockBuildInfo>>;

crates/optimism/flashblocks/src/service.rs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use crate::{
22
sequence::FlashBlockPendingSequence,
33
worker::{BuildArgs, FlashBlockBuilder},
4-
ExecutionPayloadBaseV1, FlashBlock, FlashBlockCompleteSequenceRx, PendingFlashBlock,
4+
ExecutionPayloadBaseV1, FlashBlock, FlashBlockCompleteSequenceRx, InProgressFlashBlockRx,
5+
PendingFlashBlock,
56
};
67
use alloy_eips::eip2718::WithEncoded;
78
use alloy_primitives::B256;
@@ -21,7 +22,10 @@ use std::{
2122
task::{ready, Context, Poll},
2223
time::Instant,
2324
};
24-
use tokio::{pin, sync::oneshot};
25+
use tokio::{
26+
pin,
27+
sync::{oneshot, watch},
28+
};
2529
use tracing::{debug, trace, warn};
2630

2731
pub(crate) const FB_STATE_ROOT_FROM_INDEX: usize = 9;
@@ -48,11 +52,25 @@ pub struct FlashBlockService<
4852
/// when fb received on top of the same block. Avoid redundant I/O across multiple
4953
/// executions within the same block.
5054
cached_state: Option<(B256, CachedReads)>,
55+
/// Signals when a block build is in progress
56+
in_progress_tx: watch::Sender<Option<FlashBlockBuildInfo>>,
57+
/// `FlashBlock` service's metrics
5158
metrics: FlashBlockServiceMetrics,
5259
/// Enable state root calculation from flashblock with index [`FB_STATE_ROOT_FROM_INDEX`]
5360
compute_state_root: bool,
5461
}
5562

63+
/// Information for a flashblock currently built
64+
#[derive(Debug, Clone, Copy)]
65+
pub struct FlashBlockBuildInfo {
66+
/// Parent block hash
67+
pub parent_hash: B256,
68+
/// Flashblock index within the current block's sequence
69+
pub index: u64,
70+
/// Block number of the flashblock being built.
71+
pub block_number: u64,
72+
}
73+
5674
impl<N, S, EvmConfig, Provider> FlashBlockService<N, S, EvmConfig, Provider>
5775
where
5876
N: NodePrimitives,
@@ -73,6 +91,7 @@ where
7391
{
7492
/// Constructs a new `FlashBlockService` that receives [`FlashBlock`]s from `rx` stream.
7593
pub fn new(rx: S, evm_config: EvmConfig, provider: Provider, spawner: TaskExecutor) -> Self {
94+
let (in_progress_tx, _) = watch::channel(None);
7695
Self {
7796
rx,
7897
current: None,
@@ -83,6 +102,7 @@ where
83102
spawner,
84103
job: None,
85104
cached_state: None,
105+
in_progress_tx,
86106
metrics: FlashBlockServiceMetrics::default(),
87107
compute_state_root: false,
88108
}
@@ -99,6 +119,11 @@ where
99119
self.blocks.subscribe_block_sequence()
100120
}
101121

122+
/// Returns a receiver that signals when a flashblock is being built.
123+
pub fn subscribe_in_progress(&self) -> InProgressFlashBlockRx {
124+
self.in_progress_tx.subscribe()
125+
}
126+
102127
/// Drives the services and sends new blocks to the receiver
103128
///
104129
/// Note: this should be spawned
@@ -218,6 +243,8 @@ where
218243
};
219244
// reset job
220245
this.job.take();
246+
// No build in progress
247+
let _ = this.in_progress_tx.send(None);
221248

222249
if let Some((now, result)) = result {
223250
match result {
@@ -293,6 +320,13 @@ where
293320
if let Some(args) = this.build_args() {
294321
let now = Instant::now();
295322

323+
let fb_info = FlashBlockBuildInfo {
324+
parent_hash: args.base.parent_hash,
325+
index: args.last_flashblock_index,
326+
block_number: args.base.block_number,
327+
};
328+
// Signal that a flashblock build has started with build metadata
329+
let _ = this.in_progress_tx.send(Some(fb_info));
296330
let (tx, rx) = oneshot::channel();
297331
let builder = this.builder.clone();
298332

crates/optimism/rpc/src/eth/mod.rs

Lines changed: 83 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::{
1313
OpEthApiError, SequencerClient,
1414
};
1515
use alloy_consensus::BlockHeader;
16-
use alloy_primitives::U256;
16+
use alloy_primitives::{B256, U256};
1717
use eyre::WrapErr;
1818
use op_alloy_network::Optimism;
1919
pub use receipt::{OpReceiptBuilder, OpReceiptFieldsBuilder};
@@ -23,8 +23,8 @@ use reth_evm::ConfigureEvm;
2323
use reth_node_api::{FullNodeComponents, FullNodeTypes, HeaderTy, NodeTypes};
2424
use reth_node_builder::rpc::{EthApiBuilder, EthApiCtx};
2525
use reth_optimism_flashblocks::{
26-
ExecutionPayloadBaseV1, FlashBlockCompleteSequenceRx, FlashBlockService, PendingBlockRx,
27-
WsFlashBlockStream,
26+
ExecutionPayloadBaseV1, FlashBlockBuildInfo, FlashBlockCompleteSequenceRx, FlashBlockService,
27+
InProgressFlashBlockRx, PendingBlockRx, PendingFlashBlock, WsFlashBlockStream,
2828
};
2929
use reth_rpc::eth::{core::EthApiInner, DevSigner};
3030
use reth_rpc_eth_api::{
@@ -43,10 +43,18 @@ use reth_tasks::{
4343
pool::{BlockingTaskGuard, BlockingTaskPool},
4444
TaskSpawner,
4545
};
46-
use std::{fmt, fmt::Formatter, marker::PhantomData, sync::Arc, time::Instant};
47-
use tokio::sync::watch;
46+
use std::{
47+
fmt::{self, Formatter},
48+
marker::PhantomData,
49+
sync::Arc,
50+
time::Duration,
51+
};
52+
use tokio::{sync::watch, time};
4853
use tracing::info;
4954

55+
/// Maximum duration to wait for a fresh flashblock when one is being built.
56+
const MAX_FLASHBLOCK_WAIT_DURATION: Duration = Duration::from_millis(50);
57+
5058
/// Adapter for [`EthApiInner`], which holds all the data required to serve core `eth_` API.
5159
pub type EthApiNodeBackend<N, Rpc> = EthApiInner<N, Rpc>;
5260

@@ -79,13 +87,15 @@ impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApi<N, Rpc> {
7987
min_suggested_priority_fee: U256,
8088
pending_block_rx: Option<PendingBlockRx<N::Primitives>>,
8189
flashblock_rx: Option<FlashBlockCompleteSequenceRx>,
90+
in_progress_rx: Option<InProgressFlashBlockRx>,
8291
) -> Self {
8392
let inner = Arc::new(OpEthApiInner {
8493
eth_api,
8594
sequencer_client,
8695
min_suggested_priority_fee,
8796
pending_block_rx,
8897
flashblock_rx,
98+
in_progress_rx,
8999
});
90100
Self { inner }
91101
}
@@ -109,15 +119,57 @@ impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApi<N, Rpc> {
109119
self.inner.flashblock_rx.as_ref().map(|rx| rx.resubscribe())
110120
}
111121

122+
/// Returns information about the flashblock currently being built, if any.
123+
fn flashblock_build_info(&self) -> Option<FlashBlockBuildInfo> {
124+
self.inner.in_progress_rx.as_ref().and_then(|rx| *rx.borrow())
125+
}
126+
127+
/// Extracts pending block if it matches the expected parent hash.
128+
fn extract_matching_block(
129+
&self,
130+
block: Option<&PendingFlashBlock<N::Primitives>>,
131+
parent_hash: B256,
132+
) -> Option<PendingBlock<N::Primitives>> {
133+
block.filter(|b| b.block().parent_hash() == parent_hash).map(|b| b.pending.clone())
134+
}
135+
112136
/// Build a [`OpEthApi`] using [`OpEthApiBuilder`].
113137
pub const fn builder() -> OpEthApiBuilder<Rpc> {
114138
OpEthApiBuilder::new()
115139
}
116140

141+
/// Awaits a fresh flashblock if one is being built, otherwise returns current.
142+
async fn flashblock(
143+
&self,
144+
parent_hash: B256,
145+
) -> eyre::Result<Option<PendingBlock<N::Primitives>>> {
146+
let Some(rx) = self.inner.pending_block_rx.as_ref() else { return Ok(None) };
147+
148+
// Check if a flashblock is being built
149+
if let Some(build_info) = self.flashblock_build_info() {
150+
let current_index = rx.borrow().as_ref().map(|b| b.last_flashblock_index);
151+
152+
// Check if this is the first flashblock or the next consecutive index
153+
let is_next_index = current_index.is_none_or(|idx| build_info.index == idx + 1);
154+
155+
// Wait only for relevant flashblocks: matching parent and next in sequence
156+
if build_info.parent_hash == parent_hash && is_next_index {
157+
let mut rx_clone = rx.clone();
158+
// Wait up to MAX_FLASHBLOCK_WAIT_DURATION for a new flashblock to arrive
159+
let _ = time::timeout(MAX_FLASHBLOCK_WAIT_DURATION, rx_clone.changed()).await;
160+
}
161+
}
162+
163+
// Fall back to current block
164+
Ok(self.extract_matching_block(rx.borrow().as_ref(), parent_hash))
165+
}
166+
117167
/// Returns a [`PendingBlock`] that is built out of flashblocks.
118168
///
119169
/// If flashblocks receiver is not set, then it always returns `None`.
120-
pub fn pending_flashblock(&self) -> eyre::Result<Option<PendingBlock<N::Primitives>>>
170+
///
171+
/// It may wait up to 50ms for a fresh flashblock if one is currently being built.
172+
pub async fn pending_flashblock(&self) -> eyre::Result<Option<PendingBlock<N::Primitives>>>
121173
where
122174
OpEthApiError: FromEvmError<N::Evm>,
123175
Rpc: RpcConvert<Primitives = N::Primitives>,
@@ -128,21 +180,7 @@ impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApi<N, Rpc> {
128180
PendingBlockEnvOrigin::DerivedFromLatest(parent) => parent,
129181
};
130182

131-
let Some(rx) = self.inner.pending_block_rx.as_ref() else { return Ok(None) };
132-
let pending_block = rx.borrow();
133-
let Some(pending_block) = pending_block.as_ref() else { return Ok(None) };
134-
135-
let now = Instant::now();
136-
137-
// Is the pending block not expired and latest is its parent?
138-
if pending.evm_env.block_env.number == U256::from(pending_block.block().number()) &&
139-
parent.hash() == pending_block.block().parent_hash() &&
140-
now <= pending_block.expires_at
141-
{
142-
return Ok(Some(pending_block.pending.clone()));
143-
}
144-
145-
Ok(None)
183+
self.flashblock(parent.hash()).await
146184
}
147185
}
148186

@@ -330,6 +368,8 @@ pub struct OpEthApiInner<N: RpcNodeCore, Rpc: RpcConvert> {
330368
///
331369
/// If set, then it provides sequences of flashblock built.
332370
flashblock_rx: Option<FlashBlockCompleteSequenceRx>,
371+
/// Receiver that signals when a flashblock is being built
372+
in_progress_rx: Option<InProgressFlashBlockRx>,
333373
}
334374

335375
impl<N: RpcNodeCore, Rpc: RpcConvert> fmt::Debug for OpEthApiInner<N, Rpc> {
@@ -465,24 +505,28 @@ where
465505
None
466506
};
467507

468-
let rxs = if let Some(ws_url) = flashblocks_url {
469-
info!(target: "reth:cli", %ws_url, "Launching flashblocks service");
470-
let (tx, pending_block_rx) = watch::channel(None);
471-
let stream = WsFlashBlockStream::new(ws_url);
472-
let service = FlashBlockService::new(
473-
stream,
474-
ctx.components.evm_config().clone(),
475-
ctx.components.provider().clone(),
476-
ctx.components.task_executor().clone(),
477-
);
478-
let flashblock_rx = service.subscribe_block_sequence();
479-
ctx.components.task_executor().spawn(Box::pin(service.run(tx)));
480-
Some((pending_block_rx, flashblock_rx))
481-
} else {
482-
None
483-
};
508+
let (pending_block_rx, flashblock_rx, in_progress_rx) =
509+
if let Some(ws_url) = flashblocks_url {
510+
info!(target: "reth:cli", %ws_url, "Launching flashblocks service");
511+
512+
let (tx, pending_rx) = watch::channel(None);
513+
let stream = WsFlashBlockStream::new(ws_url);
514+
let service = FlashBlockService::new(
515+
stream,
516+
ctx.components.evm_config().clone(),
517+
ctx.components.provider().clone(),
518+
ctx.components.task_executor().clone(),
519+
);
520+
521+
let flashblock_rx = service.subscribe_block_sequence();
522+
let in_progress_rx = service.subscribe_in_progress();
523+
524+
ctx.components.task_executor().spawn(Box::pin(service.run(tx)));
484525

485-
let (pending_block_rx, flashblock_rx) = rxs.unzip();
526+
(Some(pending_rx), Some(flashblock_rx), Some(in_progress_rx))
527+
} else {
528+
(None, None, None)
529+
};
486530

487531
let eth_api = ctx.eth_api_builder().with_rpc_converter(rpc_converter).build_inner();
488532

@@ -492,6 +536,7 @@ where
492536
U256::from(min_suggested_priority_fee),
493537
pending_block_rx,
494538
flashblock_rx,
539+
in_progress_rx,
495540
))
496541
}
497542
}

crates/optimism/rpc/src/eth/pending_block.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ where
4242
async fn local_pending_block(
4343
&self,
4444
) -> Result<Option<BlockAndReceipts<Self::Primitives>>, Self::Error> {
45-
if let Ok(Some(pending)) = self.pending_flashblock() {
45+
if let Ok(Some(pending)) = self.pending_flashblock().await {
4646
return Ok(Some(pending.into_block_and_receipts()));
4747
}
4848

@@ -70,7 +70,7 @@ where
7070
where
7171
Self: SpawnBlocking,
7272
{
73-
let Ok(Some(pending_block)) = self.pending_flashblock() else {
73+
let Ok(Some(pending_block)) = self.pending_flashblock().await else {
7474
return Ok(None);
7575
};
7676

crates/optimism/rpc/src/eth/transaction.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ where
127127
}
128128
} => {
129129
// Check flashblocks for faster confirmation (Optimism-specific)
130-
if let Ok(Some(pending_block)) = this.pending_flashblock() {
130+
if let Ok(Some(pending_block)) = this.pending_flashblock().await {
131131
let block_and_receipts = pending_block.into_block_and_receipts();
132132
if block_and_receipts.block.body().contains_transaction(&hash)
133133
&& let Some(receipt) = this.transaction_receipt(hash).await? {
@@ -168,7 +168,7 @@ where
168168

169169
if tx_receipt.is_none() {
170170
// if flashblocks are supported, attempt to find id from the pending block
171-
if let Ok(Some(pending_block)) = this.pending_flashblock() {
171+
if let Ok(Some(pending_block)) = this.pending_flashblock().await {
172172
let block_and_receipts = pending_block.into_block_and_receipts();
173173
if let Some((tx, receipt)) =
174174
block_and_receipts.find_transaction_and_receipt_by_hash(hash)

0 commit comments

Comments
 (0)