From 8daf8a947cd623f843dc52ffb0a1fae9320d3260 Mon Sep 17 00:00:00 2001 From: futreall <86553580+futreall@users.noreply.github.com> Date: Thu, 16 Oct 2025 12:28:39 +0300 Subject: [PATCH 1/3] feat: add subscription stream function for transaction receipts --- crates/rpc/rpc-convert/src/lib.rs | 4 +- crates/rpc/rpc-convert/src/transaction.rs | 61 ++++++++++++++++++++- crates/rpc/rpc/src/eth/pubsub.rs | 65 ++++++++++++++++++++++- 3 files changed, 126 insertions(+), 4 deletions(-) diff --git a/crates/rpc/rpc-convert/src/lib.rs b/crates/rpc/rpc-convert/src/lib.rs index 9844b17b604..8001e69f2f8 100644 --- a/crates/rpc/rpc-convert/src/lib.rs +++ b/crates/rpc/rpc-convert/src/lib.rs @@ -21,8 +21,8 @@ pub use fees::{CallFees, CallFeesError}; pub use receipt::TryFromReceiptResponse; pub use rpc::*; pub use transaction::{ - EthTxEnvError, IntoRpcTx, RpcConvert, RpcConverter, TransactionConversionError, - TryFromTransactionResponse, TryIntoSimTx, TxInfoMapper, + build_convert_receipt_inputs, EthTxEnvError, IntoRpcTx, RpcConvert, RpcConverter, + TransactionConversionError, TryFromTransactionResponse, TryIntoSimTx, TxInfoMapper, }; #[cfg(feature = "op")] diff --git a/crates/rpc/rpc-convert/src/transaction.rs b/crates/rpc/rpc-convert/src/transaction.rs index a89104bcbaf..924d2d50364 100644 --- a/crates/rpc/rpc-convert/src/transaction.rs +++ b/crates/rpc/rpc-convert/src/transaction.rs @@ -5,7 +5,9 @@ use crate::{ RpcHeader, RpcReceipt, RpcTransaction, RpcTxReq, RpcTypes, }; use alloy_consensus::{ - error::ValueError, transaction::Recovered, EthereumTxEnvelope, Sealable, TxEip4844, + error::ValueError, + transaction::{Recovered, TxHashRef}, + BlockHeader, EthereumTxEnvelope, Sealable, TxEip4844, TxReceipt, }; use alloy_network::Network; use alloy_primitives::{Address, TxKind, U256}; @@ -1100,6 +1102,63 @@ impl TryFromTransactionResponse } } +/// Helper function to build `ConvertReceiptInput` from block and receipts. +/// +/// This function takes a block and its receipts and creates the input needed for receipt +/// conversion. It handles the complex logic of calculating gas used and log indices for each +/// transaction. +/// +/// This is extracted from the pattern used in `eth_getBlockReceipts` to enable reuse in +/// subscription streams. +pub fn build_convert_receipt_inputs<'a, N: NodePrimitives>( + block: &'a reth_primitives_traits::RecoveredBlock, + receipts: &'a [N::Receipt], +) -> Vec> +where + N::SignedTx: TxHashRef, +{ + let block_number = block.header().number(); + let base_fee = block.header().base_fee_per_gas(); + let block_hash = block.hash(); + let excess_blob_gas = block.header().excess_blob_gas(); + let timestamp = block.header().timestamp(); + let mut gas_used = 0; + let mut next_log_index = 0; + + block + .transactions_recovered() + .zip(receipts.iter()) + .enumerate() + .map(|(idx, (tx, receipt))| { + let meta = reth_primitives_traits::TransactionMeta { + tx_hash: *tx.tx_hash(), + index: idx as u64, + block_hash, + block_number, + base_fee, + excess_blob_gas, + timestamp, + }; + + let cumulative_gas_used = receipt.cumulative_gas_used(); + let logs_len = receipt.logs().len(); + + let input = ConvertReceiptInput { + tx, + gas_used: cumulative_gas_used - gas_used, + next_log_index, + meta, + receipt: receipt.clone(), + }; + + gas_used = cumulative_gas_used; + next_log_index += logs_len; + + input + }) + .collect() +} + #[cfg(feature = "op")] impl TryFromTransactionResponse for reth_optimism_primitives::OpTransactionSigned diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index 985cdf3129e..7cf0234db70 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -2,6 +2,7 @@ use std::sync::Arc; +use alloy_consensus::transaction::TxHashRef; use alloy_primitives::{TxHash, U256}; use alloy_rpc_types_eth::{ pubsub::{Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata}, @@ -14,6 +15,7 @@ use jsonrpsee::{ use reth_chain_state::CanonStateSubscriptions; use reth_network_api::NetworkInfo; use reth_primitives_traits::NodePrimitives; +use reth_rpc_convert::build_convert_receipt_inputs; use reth_rpc_eth_api::{ pubsub::EthPubSubApiServer, EthApiTypes, RpcConvert, RpcNodeCore, RpcTransaction, }; @@ -201,7 +203,13 @@ where Ok(()) } _ => { - // TODO: implement once https://github.com/alloy-rs/alloy/pull/2974 is released + // For now, check if this is a TransactionReceipts subscription kind + // This is a temporary solution until alloy#2974 is merged + if format!("{:?}", kind).contains("TransactionReceipts") { + // We need to handle this differently since transaction_receipts_stream + // has different trait bounds than the current context + return Err(invalid_params_rpc_err("TransactionReceipts subscription requires additional trait bounds - see alloy#2974")); + } Err(invalid_params_rpc_err("Unsupported subscription kind")) } } @@ -372,7 +380,46 @@ where futures::stream::iter(headers) }) } +} + +impl, Eth> EthPubSubInner +where + Eth: RpcNodeCore> + + EthApiTypes< + RpcConvert: RpcConvert, + >, +{ + /// Returns a stream that yields all transaction receipts from new blocks. + fn transaction_receipts_stream( + &self, + ) -> impl Stream> { + let eth_api = self.eth_api.clone(); + self.eth_api.provider().canonical_state_stream().flat_map(move |new_chain| { + let mut all_receipts = Vec::new(); + + // Process all blocks in the committed chain - exactly like new_headers_stream pattern + for (block, receipts) in new_chain.committed().blocks_and_receipts() { + // Build ConvertReceiptInput using our helper function + let inputs = build_convert_receipt_inputs(block, receipts); + + // Convert to RPC receipts using the same pattern as eth_getBlockReceipts + if let Ok(rpc_receipts) = eth_api + .tx_resp_builder() + .convert_receipts_with_block(inputs, block.sealed_block()) + { + all_receipts.extend(rpc_receipts); + } + } + + futures::stream::iter(all_receipts) + }) + } +} +impl EthPubSubInner +where + Eth: RpcNodeCore>, +{ /// Returns a stream that yields all logs that match the given filter. fn log_stream(&self, filter: Filter) -> impl Stream { BroadcastStream::new(self.eth_api.provider().subscribe_to_canonical_state()) @@ -392,3 +439,19 @@ where }) } } + +// Specialized impl block for transaction receipts functionality +impl, Eth> EthPubSub +where + Eth: RpcNodeCore> + + EthApiTypes< + RpcConvert: RpcConvert, + >, +{ + /// Returns a stream that yields all transaction receipts from new blocks. + pub fn transaction_receipts_stream( + &self, + ) -> impl Stream> { + self.inner.transaction_receipts_stream() + } +} From 3bfaeed1630cf3af77eb5d1c435510d0d8a3c5c3 Mon Sep 17 00:00:00 2001 From: futreall <86553580+futreall@users.noreply.github.com> Date: Thu, 16 Oct 2025 14:56:39 +0300 Subject: [PATCH 2/3] feat: implement transaction_receipts_stream for EthPubSub --- crates/rpc/rpc/src/eth/pubsub.rs | 168 ++++++++++++++++++++----------- 1 file changed, 112 insertions(+), 56 deletions(-) diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index 7cf0234db70..34f4756b2ab 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -2,7 +2,7 @@ use std::sync::Arc; -use alloy_consensus::transaction::TxHashRef; +use alloy_consensus::{transaction::TxHashRef, BlockHeader, TxReceipt}; use alloy_primitives::{TxHash, U256}; use alloy_rpc_types_eth::{ pubsub::{Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata}, @@ -14,8 +14,8 @@ use jsonrpsee::{ }; use reth_chain_state::CanonStateSubscriptions; use reth_network_api::NetworkInfo; -use reth_primitives_traits::NodePrimitives; -use reth_rpc_convert::build_convert_receipt_inputs; +use reth_primitives_traits::{NodePrimitives, TransactionMeta}; +use reth_rpc_convert::{build_convert_receipt_inputs, transaction::ConvertReceiptInput}; use reth_rpc_eth_api::{ pubsub::EthPubSubApiServer, EthApiTypes, RpcConvert, RpcNodeCore, RpcTransaction, }; @@ -202,20 +202,31 @@ where Ok(()) } - _ => { - // For now, check if this is a TransactionReceipts subscription kind - // This is a temporary solution until alloy#2974 is merged - if format!("{:?}", kind).contains("TransactionReceipts") { - // We need to handle this differently since transaction_receipts_stream - // has different trait bounds than the current context - return Err(invalid_params_rpc_err("TransactionReceipts subscription requires additional trait bounds - see alloy#2974")); - } - Err(invalid_params_rpc_err("Unsupported subscription kind")) - } + _ => Err(invalid_params_rpc_err("Unsupported subscription kind")), } } } +// Additional impl block for transaction receipts functionality +impl, Eth> EthPubSub +where + Eth: RpcNodeCore< + Provider: BlockNumReader + CanonStateSubscriptions, + Pool: TransactionPool, + Network: NetworkInfo, + > + EthApiTypes< + RpcConvert: RpcConvert, + >, +{ + /// Returns a stream that yields all transaction receipts from new blocks. + pub fn transaction_receipts_stream( + &self, + hashes: Option>, + ) -> impl Stream> { + self.inner.transaction_receipts_stream(hashes) + } +} + #[async_trait::async_trait] impl EthPubSubApiServer> for EthPubSub where @@ -380,46 +391,7 @@ where futures::stream::iter(headers) }) } -} - -impl, Eth> EthPubSubInner -where - Eth: RpcNodeCore> - + EthApiTypes< - RpcConvert: RpcConvert, - >, -{ - /// Returns a stream that yields all transaction receipts from new blocks. - fn transaction_receipts_stream( - &self, - ) -> impl Stream> { - let eth_api = self.eth_api.clone(); - self.eth_api.provider().canonical_state_stream().flat_map(move |new_chain| { - let mut all_receipts = Vec::new(); - - // Process all blocks in the committed chain - exactly like new_headers_stream pattern - for (block, receipts) in new_chain.committed().blocks_and_receipts() { - // Build ConvertReceiptInput using our helper function - let inputs = build_convert_receipt_inputs(block, receipts); - - // Convert to RPC receipts using the same pattern as eth_getBlockReceipts - if let Ok(rpc_receipts) = eth_api - .tx_resp_builder() - .convert_receipts_with_block(inputs, block.sealed_block()) - { - all_receipts.extend(rpc_receipts); - } - } - - futures::stream::iter(all_receipts) - }) - } -} -impl EthPubSubInner -where - Eth: RpcNodeCore>, -{ /// Returns a stream that yields all logs that match the given filter. fn log_stream(&self, filter: Filter) -> impl Stream { BroadcastStream::new(self.eth_api.provider().subscribe_to_canonical_state()) @@ -440,8 +412,7 @@ where } } -// Specialized impl block for transaction receipts functionality -impl, Eth> EthPubSub +impl, Eth> EthPubSubInner where Eth: RpcNodeCore> + EthApiTypes< @@ -449,9 +420,94 @@ where >, { /// Returns a stream that yields all transaction receipts from new blocks. - pub fn transaction_receipts_stream( + fn transaction_receipts_stream( &self, + hashes: Option>, ) -> impl Stream> { - self.inner.transaction_receipts_stream() + let eth_api = self.eth_api.clone(); + self.eth_api.provider().canonical_state_stream().flat_map(move |new_chain| { + let mut all_receipts = Vec::new(); + + // Process all blocks in the committed chain - exactly like new_headers_stream pattern + for (block, receipts) in new_chain.committed().blocks_and_receipts() { + match &hashes { + // If no specific hashes requested, process all receipts + None => { + let inputs = build_convert_receipt_inputs(block, receipts); + if let Ok(rpc_receipts) = eth_api + .tx_resp_builder() + .convert_receipts_with_block(inputs, block.sealed_block()) + { + all_receipts.extend(rpc_receipts); + } + } + // If specific hashes requested, filter during iteration to maintain proper + // gas/log indexing + Some(target_hashes) => { + use std::collections::HashSet; + let hash_set: HashSet<_> = target_hashes.iter().collect(); + + // Build inputs manually for filtered transactions, maintaining proper order + let block_number = block.header().number(); + let base_fee = block.header().base_fee_per_gas(); + let block_hash = block.hash(); + let excess_blob_gas = block.header().excess_blob_gas(); + let timestamp = block.header().timestamp(); + let mut gas_used = 0; + let mut next_log_index = 0; + + let filtered_inputs: Vec<_> = block + .transactions_recovered() + .zip(receipts.iter()) + .enumerate() + .filter_map(|(idx, (tx, receipt))| { + let cumulative_gas_used = receipt.cumulative_gas_used(); + let logs_len = receipt.logs().len(); + + let current_gas_used = cumulative_gas_used - gas_used; + let current_log_index = next_log_index; + + // Update for next iteration + gas_used = cumulative_gas_used; + next_log_index += logs_len; + + // Only include if hash matches + hash_set.contains(tx.tx_hash()).then(|| { + let meta = TransactionMeta { + tx_hash: *tx.tx_hash(), + index: idx as u64, + block_hash, + block_number, + base_fee, + excess_blob_gas, + timestamp, + }; + + ConvertReceiptInput { + tx, + gas_used: current_gas_used, + next_log_index: current_log_index, + meta, + receipt: receipt.clone(), + } + }) + }) + .collect(); + + if !filtered_inputs.is_empty() && + let Ok(rpc_receipts) = + eth_api.tx_resp_builder().convert_receipts_with_block( + filtered_inputs, + block.sealed_block(), + ) + { + all_receipts.extend(rpc_receipts); + } + } + } + } + + futures::stream::iter(all_receipts) + }) } } From 90f3cafd2a3cc6c649057f7838fd9864528ee682 Mon Sep 17 00:00:00 2001 From: futreall <86553580+futreall@users.noreply.github.com> Date: Thu, 16 Oct 2025 19:43:23 +0300 Subject: [PATCH 3/3] feat: add transaction receipts subscription stream --- crates/rpc/rpc/src/eth/pubsub.rs | 48 +++++++++++++------------------- 1 file changed, 19 insertions(+), 29 deletions(-) diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index 34f4756b2ab..bfb4c64741b 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -57,17 +57,16 @@ impl EthPubSub { } } -impl EthPubSub +impl>, Eth> EthPubSub where Eth: RpcNodeCore< Provider: BlockNumReader + CanonStateSubscriptions, Pool: TransactionPool, Network: NetworkInfo, > + EthApiTypes< - RpcConvert: RpcConvert< - Primitives: NodePrimitives>, - >, + RpcConvert: RpcConvert, >, + PoolConsensusTx: TxHashRef, { /// Returns the current sync status for the `syncing` subscription pub fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus { @@ -96,6 +95,14 @@ where self.inner.log_stream(filter) } + /// Returns a stream that yields all transaction receipts from new blocks. + pub fn transaction_receipts_stream( + &self, + hashes: Option>, + ) -> impl Stream> { + self.inner.transaction_receipts_stream(hashes) + } + /// The actual handler for an accepted [`EthPubSub::subscribe`] call. pub async fn handle_accepted( &self, @@ -202,13 +209,17 @@ where Ok(()) } - _ => Err(invalid_params_rpc_err("Unsupported subscription kind")), + _ => { + // TODO: implement once https://github.com/alloy-rs/alloy/pull/2974 is released + Err(invalid_params_rpc_err("Unsupported subscription kind")) + } } } } -// Additional impl block for transaction receipts functionality -impl, Eth> EthPubSub +#[async_trait::async_trait] +impl>, Eth> + EthPubSubApiServer> for EthPubSub where Eth: RpcNodeCore< Provider: BlockNumReader + CanonStateSubscriptions, @@ -216,29 +227,8 @@ where Network: NetworkInfo, > + EthApiTypes< RpcConvert: RpcConvert, - >, -{ - /// Returns a stream that yields all transaction receipts from new blocks. - pub fn transaction_receipts_stream( - &self, - hashes: Option>, - ) -> impl Stream> { - self.inner.transaction_receipts_stream(hashes) - } -} - -#[async_trait::async_trait] -impl EthPubSubApiServer> for EthPubSub -where - Eth: RpcNodeCore< - Provider: BlockNumReader + CanonStateSubscriptions, - Pool: TransactionPool, - Network: NetworkInfo, - > + EthApiTypes< - RpcConvert: RpcConvert< - Primitives: NodePrimitives>, - >, > + 'static, + PoolConsensusTx: TxHashRef, { /// Handler for `eth_subscribe` async fn subscribe(