diff --git a/Cargo.lock b/Cargo.lock index afeaddc256..ed7c9bd6f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1051,16 +1051,22 @@ dependencies = [ "alpen-reth-rpc", "clap", "eyre", + "futures-util", "reth-chainspec", "reth-cli-commands", "reth-cli-runner", "reth-cli-util", "reth-db", + "reth-network", + "reth-network-api", "reth-node-builder", "reth-node-core", + "reth-primitives", + "reth-transaction-pool", "rockbound", "sled", "strata-db-store-rocksdb", + "tokio", "tracing", "typed-sled", ] @@ -1135,20 +1141,26 @@ dependencies = [ "alloy-consensus", "alloy-eips", "alloy-network", + "alloy-primitives", + "alloy-rlp", "alloy-rpc-types", "alpen-reth-evm", "alpen-reth-primitives", "alpen-reth-rpc", "eyre", + "futures", "reth-basic-payload-builder", "reth-chain-state", "reth-chainspec", "reth-errors", + "reth-eth-wire", "reth-ethereum-engine-primitives", "reth-ethereum-payload-builder", "reth-ethereum-primitives", "reth-evm", "reth-evm-ethereum", + "reth-network", + "reth-network-api", "reth-node-api", "reth-node-builder", "reth-node-ethereum", @@ -1162,6 +1174,8 @@ dependencies = [ "revm", "revm-primitives", "serde", + "tokio", + "tokio-stream", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index 30b30c3291..acd0f12b01 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,7 +59,6 @@ members = [ "crates/test-utils/test-utils", "crates/test-utils/tx-indexer", "crates/util/mmr", - "crates/db-tests", "provers/risc0", "provers/sp1", @@ -240,6 +239,7 @@ reth-db = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" } reth-engine-local = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" } reth-engine-primitives = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" } reth-errors = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" } +reth-eth-wire = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" } reth-ethereum-engine-primitives = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" } reth-ethereum-forks = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" } reth-ethereum-payload-builder = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" } @@ -248,7 +248,9 @@ reth-evm = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" } reth-evm-ethereum = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" } reth-exex = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" } reth-ipc = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" } +reth-network = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" } reth-network-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" } +reth-network-p2p = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" } reth-node-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" } reth-node-builder = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" } reth-node-core = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" } @@ -262,6 +264,7 @@ reth-primitives = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0", reth-primitives-traits = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0", default-features = false } reth-provider = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" } reth-revm = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" } +reth-rlp = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" } reth-rpc = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" } reth-rpc-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" } reth-rpc-eth-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" } @@ -349,6 +352,7 @@ terrors = "0.3.0" thiserror = "2.0.11" threadpool = "1.8" tokio = { version = "1.37", features = ["full"] } +tokio-stream = "0.1.17" toml = "0.5" tower = "0.5" tracing = "0.1" diff --git a/bin/alpen-reth/Cargo.toml b/bin/alpen-reth/Cargo.toml index 1561795dbc..1fa5ccf58e 100644 --- a/bin/alpen-reth/Cargo.toml +++ b/bin/alpen-reth/Cargo.toml @@ -33,10 +33,16 @@ reth-cli-commands.workspace = true reth-cli-runner.workspace = true reth-cli-util.workspace = true reth-db.workspace = true +reth-network.workspace = true +reth-network-api.workspace = true reth-node-builder.workspace = true reth-node-core.workspace = true +reth-transaction-pool.workspace = true +reth-primitives.workspace = true +futures-util.workspace = true rockbound = { workspace = true, optional = true } sled = { workspace = true, optional = true } strata-db-store-rocksdb = { workspace = true, optional = true } +tokio.workspace = true tracing.workspace = true typed-sled = { workspace = true, optional = true } diff --git a/bin/alpen-reth/src/main.rs b/bin/alpen-reth/src/main.rs index 025cd39d95..72201a151c 100644 --- a/bin/alpen-reth/src/main.rs +++ b/bin/alpen-reth/src/main.rs @@ -12,19 +12,32 @@ compile_error!( mod init_db; -use std::sync::Arc; +use std::{collections::HashMap, process, sync::Arc}; use alpen_chainspec::{chain_value_parser, AlpenChainSpecParser}; use alpen_reth_exex::{ProverWitnessGenerator, StateDiffGenerator}; -use alpen_reth_node::{args::AlpenNodeArgs, AlpenEthereumNode}; +use alpen_reth_node::{ + args::AlpenNodeArgs, + head_gossip::{ + connection::HeadGossipCommand, + handler::{HeadGossipEvent, HeadGossipProtocolHandler, HeadGossipState}, + }, + AlpenEthereumNode, +}; use alpen_reth_rpc::{AlpenRPC, StrataRpcApiServer}; use clap::Parser; +use futures_util::StreamExt; use init_db::init_witness_db; use reth_chainspec::ChainSpec; use reth_cli_commands::{launcher::FnLauncher, node::NodeCommand}; use reth_cli_runner::CliRunner; +use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols}; +use reth_network_api::PeerId; use reth_node_builder::{NodeBuilder, WithLaunchContext}; use reth_node_core::args::LogArgs; +use reth_primitives::Header; +use reth_transaction_pool::{FullTransactionEvent, TransactionPool}; +use tokio::sync::mpsc; use tracing::info; fn main() { @@ -93,11 +106,66 @@ fn main() { }); let handle = node_builder.launch().await?; + + // Create and add the custom RLPx subprotocol + // See: crates/reth/node/src/head_gossip/ + let (gossip_tx, mut gossip_rx) = mpsc::unbounded_channel(); + let handler = HeadGossipProtocolHandler { + state: HeadGossipState { events: gossip_tx }, + }; + handle + .node + .network + .add_rlpx_sub_protocol(handler.into_rlpx_sub_protocol()); + + // This task will handle gossip events + let mut tx_events = handle.node.pool.all_transactions_event_listener(); + tokio::spawn(async move { + let mut connections: HashMap> = + HashMap::new(); + loop { + tokio::select! { + Some(event) = gossip_rx.recv() => { + match event { + HeadGossipEvent::Established { + peer_id, + to_connection, + .. + } => { + connections.insert(peer_id, to_connection); + } + HeadGossipEvent::Closed { peer_id } => { + connections.remove(&peer_id); + } + HeadGossipEvent::HeadHash { peer_id, header } => { + // TODO(@storopoli): do something when we receive a single `Header`. + info!(target: "head-gossip", "Received head hash from peer {}: {:?}", peer_id, header.hash_slow()); + } + HeadGossipEvent::HeadHashes { peer_id, headers } => { + // TODO(@storopoli): do something when we receive multiple `Header`s. + info!(target: "head-gossip", "Received {} head hashes from peer {}", headers.len(), peer_id); + } + } + }, + Some(event) = tx_events.next() => { + // TODO(@storopoli): How to get a `Header`? + if let FullTransactionEvent::Mined { block_hash, .. } = event { + info!(target: "head-gossip", "New block mined: {:?}, broadcasting to {} peers", block_hash, connections.len()); + let header = Header::default(); + for sender in connections.values() { + let _ = sender.send(HeadGossipCommand::SendHeadHash(header.clone())); + } + } + } + } + } + }); + handle.node_exit_future.await }, ) { eprintln!("Error: {err:?}"); - std::process::exit(1); + process::exit(1); } } diff --git a/crates/reth/node/Cargo.toml b/crates/reth/node/Cargo.toml index 7242e0a72e..a51d16ac0f 100644 --- a/crates/reth/node/Cargo.toml +++ b/crates/reth/node/Cargo.toml @@ -14,17 +14,23 @@ alpen-reth-rpc.workspace = true alloy-consensus.workspace = true alloy-eips.workspace = true alloy-network.workspace = true +alloy-primitives.workspace = true +alloy-rlp.workspace = true alloy-rpc-types.workspace = true eyre.workspace = true +futures.workspace = true reth-basic-payload-builder.workspace = true reth-chain-state.workspace = true reth-chainspec.workspace = true reth-errors.workspace = true +reth-eth-wire.workspace = true reth-ethereum-engine-primitives.workspace = true reth-ethereum-payload-builder.workspace = true reth-ethereum-primitives.workspace = true reth-evm.workspace = true reth-evm-ethereum.workspace = true +reth-network.workspace = true +reth-network-api.workspace = true reth-node-api.workspace = true reth-node-builder.workspace = true reth-node-ethereum.workspace = true @@ -38,4 +44,6 @@ reth-trie-db.workspace = true revm.workspace = true revm-primitives.workspace = true serde.workspace = true +tokio.workspace = true +tokio-stream.workspace = true tracing.workspace = true diff --git a/crates/reth/node/src/head_gossip/connection.rs b/crates/reth/node/src/head_gossip/connection.rs new file mode 100644 index 0000000000..5afcf36204 --- /dev/null +++ b/crates/reth/node/src/head_gossip/connection.rs @@ -0,0 +1,159 @@ +//! Connection handler for the custom RLPx subprotocol. +use std::{ + pin::Pin, + task::{ready, Context, Poll}, +}; + +use alloy_primitives::bytes::BytesMut; +use futures::{Stream, StreamExt}; +use reth_eth_wire::{ + capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol, +}; +use reth_network::protocol::{ConnectionHandler, OnNotSupported}; +use reth_network_api::{Direction, PeerId}; +use reth_primitives::Header; +use tokio::sync::mpsc; +use tokio_stream::wrappers::UnboundedReceiverStream; + +use crate::head_gossip::{ + handler::{HeadGossipEvent, HeadGossipState}, + protocol::{head_gossip_protocol, HeadGossipMessage, HeadGossipMessageKind}, +}; + +/// Command to send to the connection. +#[allow(clippy::large_enum_variant, reason = "I don't want to box the thing")] +#[derive(Debug)] +pub enum HeadGossipCommand { + /// Send a head hash to the peer. + SendHeadHash(Header), + + /// Send multiple head hashes to the peer. + SendHeadHashes(Vec
), +} + +/// Connection handler for the head gossip protocol. +#[derive(Debug)] +pub struct HeadGossipConnectionHandler { + /// Head gossip state. + pub(crate) state: HeadGossipState, +} + +impl ConnectionHandler for HeadGossipConnectionHandler { + type Connection = HeadGossipConnection; + + fn protocol(&self) -> Protocol { + head_gossip_protocol() + } + + fn on_unsupported_by_peer( + self, + _supported: &SharedCapabilities, + _direction: Direction, + _peer_id: PeerId, + ) -> OnNotSupported { + // make it simple and keep it alive even if other peers do not support + OnNotSupported::KeepAlive + } + + fn into_connection( + self, + direction: Direction, + peer_id: PeerId, + conn: ProtocolConnection, + ) -> Self::Connection { + let (tx, rx) = mpsc::unbounded_channel(); + self.state + .events + .send(HeadGossipEvent::Established { + peer_id, + direction, + to_connection: tx, + }) + .ok(); + + HeadGossipConnection { + conn, + commands: UnboundedReceiverStream::new(rx), + peer_id, + events: self.state.events.clone(), + } + } +} + +/// Connection for the head gossip protocol. +#[derive(Debug)] +pub struct HeadGossipConnection { + /// Protocol connection. + conn: ProtocolConnection, + + /// Command stream. + commands: UnboundedReceiverStream, + + /// Peer id. + peer_id: PeerId, + + /// Event sender. + events: mpsc::UnboundedSender, +} + +impl Drop for HeadGossipConnection { + fn drop(&mut self) { + self.events + .send(HeadGossipEvent::Closed { + peer_id: self.peer_id, + }) + .ok(); + } +} + +impl Stream for HeadGossipConnection { + type Item = BytesMut; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + loop { + // Poll for outgoing messages + if let Poll::Ready(Some(cmd)) = this.commands.poll_next_unpin(cx) { + let msg = match cmd { + HeadGossipCommand::SendHeadHash(header) => { + HeadGossipMessage::new_head_hash(header) + } + HeadGossipCommand::SendHeadHashes(headers) => { + HeadGossipMessage::new_head_hashes(headers) + } + }; + return Poll::Ready(Some(msg.encoded())); + } + + // Poll for incoming messages + let Some(msg) = ready!(this.conn.poll_next_unpin(cx)) else { + return Poll::Ready(None); + }; + + let Some(msg) = HeadGossipMessage::decode_message(&mut &msg[..]) else { + // TODO(@storopoli): maybe disconnect on invalid message + return Poll::Ready(None); + }; + + match msg.message { + HeadGossipMessageKind::HeadHash(header) => { + this.events + .send(HeadGossipEvent::HeadHash { + peer_id: this.peer_id, + header, + }) + .ok(); + } + HeadGossipMessageKind::HeadHashes(headers) => { + this.events + .send(HeadGossipEvent::HeadHashes { + peer_id: this.peer_id, + headers, + }) + .ok(); + } + } + } + } +} diff --git a/crates/reth/node/src/head_gossip/handler.rs b/crates/reth/node/src/head_gossip/handler.rs new file mode 100644 index 0000000000..abcec30dae --- /dev/null +++ b/crates/reth/node/src/head_gossip/handler.rs @@ -0,0 +1,84 @@ +//! Handler for the custom RLPx subprotocol. +use std::net::SocketAddr; + +use reth_network::protocol::ProtocolHandler; +use reth_network_api::{Direction, PeerId}; +use reth_primitives::Header; +use tokio::sync::mpsc; + +use crate::head_gossip::connection::{HeadGossipCommand, HeadGossipConnectionHandler}; + +/// Events emitted by the head gossip protocol. +#[derive(Debug)] +#[allow(clippy::large_enum_variant, reason = "I don't want to box the thing")] +pub enum HeadGossipEvent { + /// New connection was established. + Established { + /// Peer that we established connection from/to. + peer_id: PeerId, + + /// Direction of the connection. + direction: Direction, + + /// Sender channel to the connection. + to_connection: mpsc::UnboundedSender, + }, + + /// Connection was closed. + Closed { + /// Peer that we closed connection. + peer_id: PeerId, + }, + + /// New head hash was received from a peer. + HeadHash { + /// Peer that we received the new head hash. + peer_id: PeerId, + + /// Received [`Header`]. + header: Header, + }, + + /// Multiple head hashes were received from a peer. + HeadHashes { + /// Peer that we received the new head hashes. + peer_id: PeerId, + + /// Received [`Header`]s. + headers: Vec
, + }, +} + +/// State of the protocol. +#[derive(Clone, Debug)] +pub struct HeadGossipState { + /// Channel for sending events to the node. + pub events: mpsc::UnboundedSender, +} + +/// The protocol handler for head gossip. +#[derive(Debug)] +pub struct HeadGossipProtocolHandler { + /// State of the head gossip protocol. + pub state: HeadGossipState, +} + +impl ProtocolHandler for HeadGossipProtocolHandler { + type ConnectionHandler = HeadGossipConnectionHandler; + + fn on_incoming(&self, _socket_addr: SocketAddr) -> Option { + Some(HeadGossipConnectionHandler { + state: self.state.clone(), + }) + } + + fn on_outgoing( + &self, + _socket_addr: SocketAddr, + _peer_id: PeerId, + ) -> Option { + Some(HeadGossipConnectionHandler { + state: self.state.clone(), + }) + } +} diff --git a/crates/reth/node/src/head_gossip/mod.rs b/crates/reth/node/src/head_gossip/mod.rs new file mode 100644 index 0000000000..4a90217e2a --- /dev/null +++ b/crates/reth/node/src/head_gossip/mod.rs @@ -0,0 +1,4 @@ +//! Custom RLPx subprotocol to gossip the head block hash. +pub mod connection; +pub mod handler; +pub mod protocol; diff --git a/crates/reth/node/src/head_gossip/protocol.rs b/crates/reth/node/src/head_gossip/protocol.rs new file mode 100644 index 0000000000..618563e2fd --- /dev/null +++ b/crates/reth/node/src/head_gossip/protocol.rs @@ -0,0 +1,116 @@ +//! RLPx subprotocol for gossiping block head hashes. + +use alloy_primitives::bytes::{Buf, BufMut, BytesMut}; +use alloy_rlp::{Decodable, Encodable}; +use reth_eth_wire::{protocol::Protocol, Capability}; +use reth_primitives::Header; + +/// Head gossip protocol name. +const PROTOCOL_NAME: &str = "head_gossip"; + +/// Head gossip protocol version. +const PROTOCOL_VERSION: usize = 1; + +/// [`Capability`] for the `head_gossip` protocol with version `1`. +pub(crate) const HEAD_GOSSIP_CAPABILITY: Capability = + Capability::new_static(PROTOCOL_NAME, PROTOCOL_VERSION); + +/// [`Protocol`] for the `head_gossip` protocol. +pub(crate) fn head_gossip_protocol() -> Protocol { + // total packets = 2 + Protocol::new(HEAD_GOSSIP_CAPABILITY, 2) +} + +/// Head gossip protocol message IDs. +#[repr(u8)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) enum HeadGossipMessageId { + /// ID when sending/receiving a single [`Header`]. + HeadHash = 0x00, + + /// ID when sending/receiving multiple [`Header`]s. + HeadHashes = 0x01, +} + +/// Head gossip protocol message kinds. +#[allow(clippy::large_enum_variant, reason = "I don't want to box the thing")] +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) enum HeadGossipMessageKind { + /// Single [`Header`] + HeadHash(Header), + + /// Multiple [`Header`]s. + HeadHashes(Vec
), +} + +/// Head gossip protocol messages. +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct HeadGossipMessage { + /// Message type. + pub message_type: HeadGossipMessageId, + + /// Underlying message. + pub message: HeadGossipMessageKind, +} + +impl HeadGossipMessage { + /// Creates a new [`HeadGossipMessage`] with a single [`Header`]. + pub(crate) fn new_head_hash(header: Header) -> Self { + Self { + message_type: HeadGossipMessageId::HeadHash, + message: HeadGossipMessageKind::HeadHash(header), + } + } + + /// Creates a new [`HeadGossipMessage`] with multiple [`Header`]s. + pub(crate) fn new_head_hashes(headers: Vec
) -> Self { + Self { + message_type: HeadGossipMessageId::HeadHashes, + message: HeadGossipMessageKind::HeadHashes(headers), + } + } + + /// Encodes a [`HeadGossipMessage`] into bytes. + pub(crate) fn encoded(&self) -> BytesMut { + let mut buf = BytesMut::new(); + buf.put_u8(self.message_type as u8); + match &self.message { + HeadGossipMessageKind::HeadHash(header) => { + header.encode(&mut buf); + } + HeadGossipMessageKind::HeadHashes(headers) => { + headers.encode(&mut buf); + } + } + buf + } + + /// Decodes a [`HeadGossipMessage`] into bytes. + pub(crate) fn decode_message(buf: &mut &[u8]) -> Option { + if buf.is_empty() { + return None; + } + let id = buf[0]; + buf.advance(1); + let message_type = match id { + 0x00 => HeadGossipMessageId::HeadHash, + 0x01 => HeadGossipMessageId::HeadHashes, + _ => return None, + }; + let message = match message_type { + HeadGossipMessageId::HeadHash => { + let header = Header::decode(buf).ok()?; + HeadGossipMessageKind::HeadHash(header) + } + HeadGossipMessageId::HeadHashes => { + let headers = Vec::
::decode(buf).ok()?; + HeadGossipMessageKind::HeadHashes(headers) + } + }; + + Some(Self { + message_type, + message, + }) + } +} diff --git a/crates/reth/node/src/lib.rs b/crates/reth/node/src/lib.rs index 1a62970bf1..09989db1a3 100644 --- a/crates/reth/node/src/lib.rs +++ b/crates/reth/node/src/lib.rs @@ -1,6 +1,7 @@ //! Reth node implementation for the Alpen EE. mod engine; mod evm; +pub mod head_gossip; mod node; mod payload; mod payload_builder;