Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/rpc/rpc-convert/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ pub use fees::{CallFees, CallFeesError};
pub use receipt::TryFromReceiptResponse;
pub use rpc::*;
pub use transaction::{
EthTxEnvError, IntoRpcTx, RpcConvert, RpcConverter, TransactionConversionError,
TryFromTransactionResponse, TryIntoSimTx, TxInfoMapper,
build_convert_receipt_inputs, EthTxEnvError, IntoRpcTx, RpcConvert, RpcConverter,
TransactionConversionError, TryFromTransactionResponse, TryIntoSimTx, TxInfoMapper,
};

#[cfg(feature = "op")]
Expand Down
61 changes: 60 additions & 1 deletion crates/rpc/rpc-convert/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use crate::{
RpcHeader, RpcReceipt, RpcTransaction, RpcTxReq, RpcTypes,
};
use alloy_consensus::{
error::ValueError, transaction::Recovered, EthereumTxEnvelope, Sealable, TxEip4844,
error::ValueError,
transaction::{Recovered, TxHashRef},
BlockHeader, EthereumTxEnvelope, Sealable, TxEip4844, TxReceipt,
};
use alloy_network::Network;
use alloy_primitives::{Address, TxKind, U256};
Expand Down Expand Up @@ -1100,6 +1102,63 @@ impl TryFromTransactionResponse<alloy_network::Ethereum>
}
}

/// Helper function to build `ConvertReceiptInput` from block and receipts.
///
/// This function takes a block and its receipts and creates the input needed for receipt
/// conversion. It handles the complex logic of calculating gas used and log indices for each
/// transaction.
///
/// This is extracted from the pattern used in `eth_getBlockReceipts` to enable reuse in
/// subscription streams.
pub fn build_convert_receipt_inputs<'a, N: NodePrimitives>(
block: &'a reth_primitives_traits::RecoveredBlock<N::Block>,
receipts: &'a [N::Receipt],
) -> Vec<ConvertReceiptInput<'a, N>>
where
N::SignedTx: TxHashRef,
{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@klkvr thoughts on this

whould we perhaps make this a native fn of RecoveredBlock ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep sgtm, thoughConvertReceiptInput::from_block_and_receipts might be eaiser here because ConvertReceiptInput is RPC-only type

also I would prefer this to be re-used in

let inputs = block
.transactions_recovered()
.zip(Arc::unwrap_or_clone(receipts))
.enumerate()
.map(|(idx, (tx, receipt))| {
let meta = TransactionMeta {
tx_hash: *tx.tx_hash(),
index: idx as u64,
block_hash,
block_number,
base_fee,
excess_blob_gas,
timestamp,
};
let cumulative_gas_used = receipt.cumulative_gas_used();
let logs_len = receipt.logs().len();
let input = ConvertReceiptInput {
tx,
gas_used: cumulative_gas_used - gas_used,
next_log_index,
meta,
receipt,
};
gas_used = cumulative_gas_used;
next_log_index += logs_len;
input
})
.collect::<Vec<_>>();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattsse separate PR for the refactor or include here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@futreall let's do a separate one for this

let block_number = block.header().number();
let base_fee = block.header().base_fee_per_gas();
let block_hash = block.hash();
let excess_blob_gas = block.header().excess_blob_gas();
let timestamp = block.header().timestamp();
let mut gas_used = 0;
let mut next_log_index = 0;

block
.transactions_recovered()
.zip(receipts.iter())
.enumerate()
.map(|(idx, (tx, receipt))| {
let meta = reth_primitives_traits::TransactionMeta {
tx_hash: *tx.tx_hash(),
index: idx as u64,
block_hash,
block_number,
base_fee,
excess_blob_gas,
timestamp,
};

let cumulative_gas_used = receipt.cumulative_gas_used();
let logs_len = receipt.logs().len();

let input = ConvertReceiptInput {
tx,
gas_used: cumulative_gas_used - gas_used,
next_log_index,
meta,
receipt: receipt.clone(),
};

gas_used = cumulative_gas_used;
next_log_index += logs_len;

input
})
.collect()
}

#[cfg(feature = "op")]
impl TryFromTransactionResponse<op_alloy_network::Optimism>
for reth_optimism_primitives::OpTransactionSigned
Expand Down
129 changes: 124 additions & 5 deletions crates/rpc/rpc/src/eth/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::sync::Arc;

use alloy_consensus::{transaction::TxHashRef, BlockHeader, TxReceipt};
use alloy_primitives::{TxHash, U256};
use alloy_rpc_types_eth::{
pubsub::{Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata},
Expand All @@ -13,7 +14,8 @@ use jsonrpsee::{
};
use reth_chain_state::CanonStateSubscriptions;
use reth_network_api::NetworkInfo;
use reth_primitives_traits::NodePrimitives;
use reth_primitives_traits::{NodePrimitives, TransactionMeta};
use reth_rpc_convert::{build_convert_receipt_inputs, transaction::ConvertReceiptInput};
use reth_rpc_eth_api::{
pubsub::EthPubSubApiServer, EthApiTypes, RpcConvert, RpcNodeCore, RpcTransaction,
};
Expand Down Expand Up @@ -200,14 +202,31 @@ where

Ok(())
}
_ => {
// TODO: implement once https://github.com/alloy-rs/alloy/pull/2974 is released
Err(invalid_params_rpc_err("Unsupported subscription kind"))
}
_ => Err(invalid_params_rpc_err("Unsupported subscription kind")),
}
}
}

// Additional impl block for transaction receipts functionality
impl<N: NodePrimitives<SignedTx: TxHashRef>, Eth> EthPubSub<Eth>
where
Eth: RpcNodeCore<
Provider: BlockNumReader + CanonStateSubscriptions<Primitives = N>,
Pool: TransactionPool,
Network: NetworkInfo,
> + EthApiTypes<
RpcConvert: RpcConvert<Primitives = N, Network = Eth::NetworkTypes, Error = Eth::Error>,
>,
{
/// Returns a stream that yields all transaction receipts from new blocks.
pub fn transaction_receipts_stream(
&self,
hashes: Option<Vec<TxHash>>,
) -> impl Stream<Item = reth_rpc_convert::RpcReceipt<Eth::NetworkTypes>> {
self.inner.transaction_receipts_stream(hashes)
}
}

#[async_trait::async_trait]
impl<Eth> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for EthPubSub<Eth>
where
Expand Down Expand Up @@ -392,3 +411,103 @@ where
})
}
}

impl<N: NodePrimitives<SignedTx: TxHashRef>, Eth> EthPubSubInner<Eth>
where
Eth: RpcNodeCore<Provider: CanonStateSubscriptions<Primitives = N>>
+ EthApiTypes<
RpcConvert: RpcConvert<Primitives = N, Network = Eth::NetworkTypes, Error = Eth::Error>,
>,
{
/// Returns a stream that yields all transaction receipts from new blocks.
fn transaction_receipts_stream(
&self,
hashes: Option<Vec<TxHash>>,
) -> impl Stream<Item = reth_rpc_convert::RpcReceipt<Eth::NetworkTypes>> {
let eth_api = self.eth_api.clone();
self.eth_api.provider().canonical_state_stream().flat_map(move |new_chain| {
let mut all_receipts = Vec::new();

// Process all blocks in the committed chain - exactly like new_headers_stream pattern
for (block, receipts) in new_chain.committed().blocks_and_receipts() {
match &hashes {
// If no specific hashes requested, process all receipts
None => {
let inputs = build_convert_receipt_inputs(block, receipts);
if let Ok(rpc_receipts) = eth_api
.tx_resp_builder()
.convert_receipts_with_block(inputs, block.sealed_block())
{
all_receipts.extend(rpc_receipts);
}
}
// If specific hashes requested, filter during iteration to maintain proper
// gas/log indexing
Some(target_hashes) => {
use std::collections::HashSet;
let hash_set: HashSet<_> = target_hashes.iter().collect();

// Build inputs manually for filtered transactions, maintaining proper order
let block_number = block.header().number();
let base_fee = block.header().base_fee_per_gas();
let block_hash = block.hash();
let excess_blob_gas = block.header().excess_blob_gas();
let timestamp = block.header().timestamp();
let mut gas_used = 0;
let mut next_log_index = 0;

let filtered_inputs: Vec<_> = block
.transactions_recovered()
.zip(receipts.iter())
.enumerate()
.filter_map(|(idx, (tx, receipt))| {
let cumulative_gas_used = receipt.cumulative_gas_used();
let logs_len = receipt.logs().len();

let current_gas_used = cumulative_gas_used - gas_used;
let current_log_index = next_log_index;

// Update for next iteration
gas_used = cumulative_gas_used;
next_log_index += logs_len;

// Only include if hash matches
hash_set.contains(tx.tx_hash()).then(|| {
let meta = TransactionMeta {
tx_hash: *tx.tx_hash(),
index: idx as u64,
block_hash,
block_number,
base_fee,
excess_blob_gas,
timestamp,
};

ConvertReceiptInput {
tx,
gas_used: current_gas_used,
next_log_index: current_log_index,
meta,
receipt: receipt.clone(),
}
})
})
.collect();

if !filtered_inputs.is_empty() &&
let Ok(rpc_receipts) =
eth_api.tx_resp_builder().convert_receipts_with_block(
filtered_inputs,
block.sealed_block(),
)
{
all_receipts.extend(rpc_receipts);
}
}
}
}

futures::stream::iter(all_receipts)
})
}
}