Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/optimism/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ op-revm.workspace = true

# async
tokio.workspace = true
futures.workspace = true
tokio-stream.workspace = true
reqwest = { workspace = true, features = ["rustls-tls-native-roots"] }
async-trait.workspace = true
tower.workspace = true
Expand Down
80 changes: 78 additions & 2 deletions crates/optimism/rpc/src/eth/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ use crate::{OpEthApi, OpEthApiError, SequencerClient};
use alloy_consensus::TxReceipt as _;
use alloy_primitives::{Bytes, B256};
use alloy_rpc_types_eth::TransactionInfo;
use futures::StreamExt;
use op_alloy_consensus::{transaction::OpTransactionInfo, OpTransaction};
use reth_chain_state::CanonStateSubscriptions;
use reth_optimism_primitives::DepositReceipt;
use reth_primitives_traits::{SignedTransaction, SignerRecoverable};
use reth_primitives_traits::{BlockBody, SignedTransaction, SignerRecoverable};
use reth_rpc_convert::transaction::ConvertReceiptInput;
use reth_rpc_eth_api::{
helpers::{
Expand All @@ -16,7 +18,7 @@ use reth_rpc_eth_api::{
try_into_op_tx_info, EthApiTypes as _, FromEthApiError, FromEvmError, RpcConvert, RpcNodeCore,
RpcReceipt, TxInfoMapper,
};
use reth_rpc_eth_types::utils::recover_raw_transaction;
use reth_rpc_eth_types::{utils::recover_raw_transaction, EthApiError};
use reth_storage_api::{errors::ProviderError, ReceiptProvider};
use reth_transaction_pool::{
AddedTransactionOutcome, PoolTransaction, TransactionOrigin, TransactionPool,
Expand All @@ -26,6 +28,7 @@ use std::{
future::Future,
time::Duration,
};
use tokio_stream::wrappers::WatchStream;

impl<N, Rpc> EthTransactions for OpEthApi<N, Rpc>
where
Expand Down Expand Up @@ -78,6 +81,79 @@ where
Ok(hash)
}

/// Decodes and recovers the transaction and submits it to the pool.
///
/// And awaits the receipt, checking both canonical blocks and flashblocks for faster
/// confirmation.
fn send_raw_transaction_sync(
&self,
tx: Bytes,
) -> impl Future<Output = Result<RpcReceipt<Self::NetworkTypes>, Self::Error>> + Send
where
Self: LoadReceipt + 'static,
{
let this = self.clone();
let timeout_duration = self.send_raw_transaction_sync_timeout();
async move {
let hash = EthTransactions::send_raw_transaction(&this, tx).await?;
let mut canonical_stream = this.provider().canonical_state_stream();
let flashblock_rx = this.pending_block_rx();
let mut flashblock_stream = flashblock_rx.map(WatchStream::new);

tokio::time::timeout(timeout_duration, async {
loop {
tokio::select! {
// Listen for regular canonical block updates for inclusion
canonical_notification = canonical_stream.next() => {
if let Some(notification) = canonical_notification {
let chain = notification.committed();
for block in chain.blocks_iter() {
if block.body().contains_transaction(&hash) {
if let Some(receipt) = this.transaction_receipt(hash).await? {
return Ok(receipt);
}
}
}
} else {
// Canonical stream ended
break;
}
}
// check if the tx was preconfirmed in a new flashblock
_flashblock_update = async {
if let Some(ref mut stream) = flashblock_stream {
stream.next().await
} else {
futures::future::pending().await
}
} => {
// Check flashblocks for faster confirmation (Optimism-specific)
if let Ok(Some(pending_block)) = this.pending_flashblock() {
let block_and_receipts = pending_block.into_block_and_receipts();
if block_and_receipts.block.body().contains_transaction(&hash) {
if let Some(receipt) = this.transaction_receipt(hash).await? {
return Ok(receipt);
}
}
}
}
}
}
Err(Self::Error::from_eth_err(EthApiError::TransactionConfirmationTimeout {
hash,
duration: timeout_duration,
}))
})
.await
.unwrap_or_else(|_elapsed| {
Err(Self::Error::from_eth_err(EthApiError::TransactionConfirmationTimeout {
hash,
duration: timeout_duration,
}))
})
}
}

/// Returns the transaction receipt for the given hash.
///
/// With flashblocks, we should also lookup the pending block for the transaction
Expand Down
Loading