diff --git a/crates/rpc/rpc-convert/src/lib.rs b/crates/rpc/rpc-convert/src/lib.rs index 9844b17b604..8001e69f2f8 100644 --- a/crates/rpc/rpc-convert/src/lib.rs +++ b/crates/rpc/rpc-convert/src/lib.rs @@ -21,8 +21,8 @@ pub use fees::{CallFees, CallFeesError}; pub use receipt::TryFromReceiptResponse; pub use rpc::*; pub use transaction::{ - EthTxEnvError, IntoRpcTx, RpcConvert, RpcConverter, TransactionConversionError, - TryFromTransactionResponse, TryIntoSimTx, TxInfoMapper, + build_convert_receipt_inputs, EthTxEnvError, IntoRpcTx, RpcConvert, RpcConverter, + TransactionConversionError, TryFromTransactionResponse, TryIntoSimTx, TxInfoMapper, }; #[cfg(feature = "op")] diff --git a/crates/rpc/rpc-convert/src/transaction.rs b/crates/rpc/rpc-convert/src/transaction.rs index a89104bcbaf..924d2d50364 100644 --- a/crates/rpc/rpc-convert/src/transaction.rs +++ b/crates/rpc/rpc-convert/src/transaction.rs @@ -5,7 +5,9 @@ use crate::{ RpcHeader, RpcReceipt, RpcTransaction, RpcTxReq, RpcTypes, }; use alloy_consensus::{ - error::ValueError, transaction::Recovered, EthereumTxEnvelope, Sealable, TxEip4844, + error::ValueError, + transaction::{Recovered, TxHashRef}, + BlockHeader, EthereumTxEnvelope, Sealable, TxEip4844, TxReceipt, }; use alloy_network::Network; use alloy_primitives::{Address, TxKind, U256}; @@ -1100,6 +1102,63 @@ impl TryFromTransactionResponse } } +/// Helper function to build `ConvertReceiptInput` from block and receipts. +/// +/// This function takes a block and its receipts and creates the input needed for receipt +/// conversion. It handles the complex logic of calculating gas used and log indices for each +/// transaction. +/// +/// This is extracted from the pattern used in `eth_getBlockReceipts` to enable reuse in +/// subscription streams. +pub fn build_convert_receipt_inputs<'a, N: NodePrimitives>( + block: &'a reth_primitives_traits::RecoveredBlock, + receipts: &'a [N::Receipt], +) -> Vec> +where + N::SignedTx: TxHashRef, +{ + let block_number = block.header().number(); + let base_fee = block.header().base_fee_per_gas(); + let block_hash = block.hash(); + let excess_blob_gas = block.header().excess_blob_gas(); + let timestamp = block.header().timestamp(); + let mut gas_used = 0; + let mut next_log_index = 0; + + block + .transactions_recovered() + .zip(receipts.iter()) + .enumerate() + .map(|(idx, (tx, receipt))| { + let meta = reth_primitives_traits::TransactionMeta { + tx_hash: *tx.tx_hash(), + index: idx as u64, + block_hash, + block_number, + base_fee, + excess_blob_gas, + timestamp, + }; + + let cumulative_gas_used = receipt.cumulative_gas_used(); + let logs_len = receipt.logs().len(); + + let input = ConvertReceiptInput { + tx, + gas_used: cumulative_gas_used - gas_used, + next_log_index, + meta, + receipt: receipt.clone(), + }; + + gas_used = cumulative_gas_used; + next_log_index += logs_len; + + input + }) + .collect() +} + #[cfg(feature = "op")] impl TryFromTransactionResponse for reth_optimism_primitives::OpTransactionSigned diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index 985cdf3129e..bfb4c64741b 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -2,6 +2,7 @@ use std::sync::Arc; +use alloy_consensus::{transaction::TxHashRef, BlockHeader, TxReceipt}; use alloy_primitives::{TxHash, U256}; use alloy_rpc_types_eth::{ pubsub::{Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata}, @@ -13,7 +14,8 @@ use jsonrpsee::{ }; use reth_chain_state::CanonStateSubscriptions; use reth_network_api::NetworkInfo; -use reth_primitives_traits::NodePrimitives; +use reth_primitives_traits::{NodePrimitives, TransactionMeta}; +use reth_rpc_convert::{build_convert_receipt_inputs, transaction::ConvertReceiptInput}; use reth_rpc_eth_api::{ pubsub::EthPubSubApiServer, EthApiTypes, RpcConvert, RpcNodeCore, RpcTransaction, }; @@ -55,17 +57,16 @@ impl EthPubSub { } } -impl EthPubSub +impl>, Eth> EthPubSub where Eth: RpcNodeCore< Provider: BlockNumReader + CanonStateSubscriptions, Pool: TransactionPool, Network: NetworkInfo, > + EthApiTypes< - RpcConvert: RpcConvert< - Primitives: NodePrimitives>, - >, + RpcConvert: RpcConvert, >, + PoolConsensusTx: TxHashRef, { /// Returns the current sync status for the `syncing` subscription pub fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus { @@ -94,6 +95,14 @@ where self.inner.log_stream(filter) } + /// Returns a stream that yields all transaction receipts from new blocks. + pub fn transaction_receipts_stream( + &self, + hashes: Option>, + ) -> impl Stream> { + self.inner.transaction_receipts_stream(hashes) + } + /// The actual handler for an accepted [`EthPubSub::subscribe`] call. pub async fn handle_accepted( &self, @@ -209,17 +218,17 @@ where } #[async_trait::async_trait] -impl EthPubSubApiServer> for EthPubSub +impl>, Eth> + EthPubSubApiServer> for EthPubSub where Eth: RpcNodeCore< - Provider: BlockNumReader + CanonStateSubscriptions, + Provider: BlockNumReader + CanonStateSubscriptions, Pool: TransactionPool, Network: NetworkInfo, > + EthApiTypes< - RpcConvert: RpcConvert< - Primitives: NodePrimitives>, - >, + RpcConvert: RpcConvert, > + 'static, + PoolConsensusTx: TxHashRef, { /// Handler for `eth_subscribe` async fn subscribe( @@ -392,3 +401,103 @@ where }) } } + +impl, Eth> EthPubSubInner +where + Eth: RpcNodeCore> + + EthApiTypes< + RpcConvert: RpcConvert, + >, +{ + /// Returns a stream that yields all transaction receipts from new blocks. + fn transaction_receipts_stream( + &self, + hashes: Option>, + ) -> impl Stream> { + let eth_api = self.eth_api.clone(); + self.eth_api.provider().canonical_state_stream().flat_map(move |new_chain| { + let mut all_receipts = Vec::new(); + + // Process all blocks in the committed chain - exactly like new_headers_stream pattern + for (block, receipts) in new_chain.committed().blocks_and_receipts() { + match &hashes { + // If no specific hashes requested, process all receipts + None => { + let inputs = build_convert_receipt_inputs(block, receipts); + if let Ok(rpc_receipts) = eth_api + .tx_resp_builder() + .convert_receipts_with_block(inputs, block.sealed_block()) + { + all_receipts.extend(rpc_receipts); + } + } + // If specific hashes requested, filter during iteration to maintain proper + // gas/log indexing + Some(target_hashes) => { + use std::collections::HashSet; + let hash_set: HashSet<_> = target_hashes.iter().collect(); + + // Build inputs manually for filtered transactions, maintaining proper order + let block_number = block.header().number(); + let base_fee = block.header().base_fee_per_gas(); + let block_hash = block.hash(); + let excess_blob_gas = block.header().excess_blob_gas(); + let timestamp = block.header().timestamp(); + let mut gas_used = 0; + let mut next_log_index = 0; + + let filtered_inputs: Vec<_> = block + .transactions_recovered() + .zip(receipts.iter()) + .enumerate() + .filter_map(|(idx, (tx, receipt))| { + let cumulative_gas_used = receipt.cumulative_gas_used(); + let logs_len = receipt.logs().len(); + + let current_gas_used = cumulative_gas_used - gas_used; + let current_log_index = next_log_index; + + // Update for next iteration + gas_used = cumulative_gas_used; + next_log_index += logs_len; + + // Only include if hash matches + hash_set.contains(tx.tx_hash()).then(|| { + let meta = TransactionMeta { + tx_hash: *tx.tx_hash(), + index: idx as u64, + block_hash, + block_number, + base_fee, + excess_blob_gas, + timestamp, + }; + + ConvertReceiptInput { + tx, + gas_used: current_gas_used, + next_log_index: current_log_index, + meta, + receipt: receipt.clone(), + } + }) + }) + .collect(); + + if !filtered_inputs.is_empty() && + let Ok(rpc_receipts) = + eth_api.tx_resp_builder().convert_receipts_with_block( + filtered_inputs, + block.sealed_block(), + ) + { + all_receipts.extend(rpc_receipts); + } + } + } + } + + futures::stream::iter(all_receipts) + }) + } +}