diff --git a/Cargo.lock b/Cargo.lock index d4b6c54f722..99c736fc96d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9479,6 +9479,7 @@ dependencies = [ "async-trait", "derive_more", "eyre", + "futures", "jsonrpsee", "jsonrpsee-core", "jsonrpsee-types", @@ -9518,6 +9519,7 @@ dependencies = [ "serde_json", "thiserror 2.0.16", "tokio", + "tokio-stream", "tower", "tracing", ] diff --git a/crates/optimism/rpc/Cargo.toml b/crates/optimism/rpc/Cargo.toml index 2a90a8ca580..acbc491f648 100644 --- a/crates/optimism/rpc/Cargo.toml +++ b/crates/optimism/rpc/Cargo.toml @@ -60,6 +60,8 @@ op-revm.workspace = true # async tokio.workspace = true +futures.workspace = true +tokio-stream.workspace = true reqwest = { workspace = true, features = ["rustls-tls-native-roots"] } async-trait.workspace = true tower.workspace = true diff --git a/crates/optimism/rpc/src/eth/transaction.rs b/crates/optimism/rpc/src/eth/transaction.rs index a4326891f71..27d1ad831bf 100644 --- a/crates/optimism/rpc/src/eth/transaction.rs +++ b/crates/optimism/rpc/src/eth/transaction.rs @@ -4,9 +4,11 @@ use crate::{OpEthApi, OpEthApiError, SequencerClient}; use alloy_consensus::TxReceipt as _; use alloy_primitives::{Bytes, B256}; use alloy_rpc_types_eth::TransactionInfo; +use futures::StreamExt; use op_alloy_consensus::{transaction::OpTransactionInfo, OpTransaction}; +use reth_chain_state::CanonStateSubscriptions; use reth_optimism_primitives::DepositReceipt; -use reth_primitives_traits::{SignedTransaction, SignerRecoverable}; +use reth_primitives_traits::{BlockBody, SignedTransaction, SignerRecoverable}; use reth_rpc_convert::transaction::ConvertReceiptInput; use reth_rpc_eth_api::{ helpers::{ @@ -16,7 +18,7 @@ use reth_rpc_eth_api::{ try_into_op_tx_info, EthApiTypes as _, FromEthApiError, FromEvmError, RpcConvert, RpcNodeCore, RpcReceipt, TxInfoMapper, }; -use reth_rpc_eth_types::utils::recover_raw_transaction; +use reth_rpc_eth_types::{utils::recover_raw_transaction, EthApiError}; use reth_storage_api::{errors::ProviderError, ReceiptProvider}; use reth_transaction_pool::{ AddedTransactionOutcome, PoolTransaction, TransactionOrigin, TransactionPool, @@ -26,6 +28,7 @@ use std::{ future::Future, time::Duration, }; +use tokio_stream::wrappers::WatchStream; impl EthTransactions for OpEthApi where @@ -78,6 +81,79 @@ where Ok(hash) } + /// Decodes and recovers the transaction and submits it to the pool. + /// + /// And awaits the receipt, checking both canonical blocks and flashblocks for faster + /// confirmation. + fn send_raw_transaction_sync( + &self, + tx: Bytes, + ) -> impl Future, Self::Error>> + Send + where + Self: LoadReceipt + 'static, + { + let this = self.clone(); + let timeout_duration = self.send_raw_transaction_sync_timeout(); + async move { + let hash = EthTransactions::send_raw_transaction(&this, tx).await?; + let mut canonical_stream = this.provider().canonical_state_stream(); + let flashblock_rx = this.pending_block_rx(); + let mut flashblock_stream = flashblock_rx.map(WatchStream::new); + + tokio::time::timeout(timeout_duration, async { + loop { + tokio::select! { + // Listen for regular canonical block updates for inclusion + canonical_notification = canonical_stream.next() => { + if let Some(notification) = canonical_notification { + let chain = notification.committed(); + for block in chain.blocks_iter() { + if block.body().contains_transaction(&hash) { + if let Some(receipt) = this.transaction_receipt(hash).await? { + return Ok(receipt); + } + } + } + } else { + // Canonical stream ended + break; + } + } + // check if the tx was preconfirmed in a new flashblock + _flashblock_update = async { + if let Some(ref mut stream) = flashblock_stream { + stream.next().await + } else { + futures::future::pending().await + } + } => { + // Check flashblocks for faster confirmation (Optimism-specific) + if let Ok(Some(pending_block)) = this.pending_flashblock() { + let block_and_receipts = pending_block.into_block_and_receipts(); + if block_and_receipts.block.body().contains_transaction(&hash) { + if let Some(receipt) = this.transaction_receipt(hash).await? { + return Ok(receipt); + } + } + } + } + } + } + Err(Self::Error::from_eth_err(EthApiError::TransactionConfirmationTimeout { + hash, + duration: timeout_duration, + })) + }) + .await + .unwrap_or_else(|_elapsed| { + Err(Self::Error::from_eth_err(EthApiError::TransactionConfirmationTimeout { + hash, + duration: timeout_duration, + })) + }) + } + } + /// Returns the transaction receipt for the given hash. /// /// With flashblocks, we should also lookup the pending block for the transaction