diff --git a/consensus/notify/src/notification.rs b/consensus/notify/src/notification.rs index a9e758b0a1..b9b51f6975 100644 --- a/consensus/notify/src/notification.rs +++ b/consensus/notify/src/notification.rs @@ -42,6 +42,9 @@ pub enum Notification { #[display(fmt = "NewBlockTemplate notification")] NewBlockTemplate(NewBlockTemplateNotification), + + #[display(fmt = "MempoolSizeChanged notification: size {}", "_0.network_mempool_size")] + MempoolSizeChanged(MempoolSizeChangedNotification), } } @@ -181,3 +184,14 @@ pub struct PruningPointUtxoSetOverrideNotification {} #[derive(Debug, Clone)] pub struct NewBlockTemplateNotification {} + +#[derive(Debug, Clone)] +pub struct MempoolSizeChangedNotification { + pub network_mempool_size: u64, +} + +impl MempoolSizeChangedNotification { + pub fn new(network_mempool_size: u64) -> Self { + Self { network_mempool_size } + } +} diff --git a/notify/src/events.rs b/notify/src/events.rs index e27f51bfa8..b0902646a0 100644 --- a/notify/src/events.rs +++ b/notify/src/events.rs @@ -51,10 +51,11 @@ event_type_enum! { VirtualDaaScoreChanged, PruningPointUtxoSetOverride, NewBlockTemplate, + MempoolSizeChanged, } } -pub const EVENT_COUNT: usize = 9; +pub const EVENT_COUNT: usize = 10; impl FromStr for EventType { type Err = Error; @@ -70,6 +71,7 @@ impl FromStr for EventType { "virtual-daa-score-changed" => Ok(EventType::VirtualDaaScoreChanged), "pruning-point-utxo-set-override" => Ok(EventType::PruningPointUtxoSetOverride), "new-block-template" => Ok(EventType::NewBlockTemplate), + "mempool-size-changed" => Ok(EventType::MempoolSizeChanged), _ => Err(Error::InvalidEventType(s.to_string())), } } diff --git a/notify/src/scope.rs b/notify/src/scope.rs index 1fe7924711..b564dd2c57 100644 --- a/notify/src/scope.rs +++ b/notify/src/scope.rs @@ -45,6 +45,7 @@ pub enum Scope { VirtualDaaScoreChanged, PruningPointUtxoSetOverride, NewBlockTemplate, + MempoolSizeChanged, } } @@ -266,3 +267,20 @@ impl Deserializer for NewBlockTemplateScope { Ok(Self {}) } } + +#[derive(Clone, Display, Debug, Default, PartialEq, Eq, Serialize, Deserialize, BorshSerialize, BorshDeserialize)] +pub struct MempoolSizeChangedScope {} + +impl Serializer for MempoolSizeChangedScope { + fn serialize(&self, writer: &mut W) -> std::io::Result<()> { + store!(u16, &1, writer)?; + Ok(()) + } +} + +impl Deserializer for MempoolSizeChangedScope { + fn deserialize(reader: &mut R) -> std::io::Result { + let _version = load!(u16, reader)?; + Ok(Self {}) + } +} diff --git a/protocol/flows/src/flow_context.rs b/protocol/flows/src/flow_context.rs index 5aeff6d1b5..9d22000c5c 100644 --- a/protocol/flows/src/flow_context.rs +++ b/protocol/flows/src/flow_context.rs @@ -14,7 +14,7 @@ use kaspa_consensus_core::config::Config; use kaspa_consensus_core::errors::block::RuleError; use kaspa_consensus_core::tx::{Transaction, TransactionId}; use kaspa_consensus_notify::{ - notification::{Notification, PruningPointUtxoSetOverrideNotification}, + notification::{MempoolSizeChangedNotification, Notification, PruningPointUtxoSetOverrideNotification}, root::ConsensusNotificationRoot, }; use kaspa_consensusmanager::{BlockProcessingBatch, ConsensusInstance, ConsensusManager, ConsensusProxy, ConsensusSessionOwned}; @@ -26,6 +26,7 @@ use kaspa_core::{ use kaspa_core::{time::unix_now, warn}; use kaspa_hashes::Hash; use kaspa_mining::mempool::tx::{Orphan, Priority}; +use kaspa_mining::model::tx_query::TransactionQuery; use kaspa_mining::{manager::MiningManagerProxy, mempool::tx::RbfPolicy}; use kaspa_notify::notifier::Notify; use kaspa_p2p_lib::{ @@ -624,6 +625,7 @@ impl FlowContext { .await; } } + context.on_transaction_added_to_mempool().await; context.mempool_scanning_is_done().await; debug!("<> Mempool scanning task is done"); }); @@ -649,7 +651,10 @@ impl FlowContext { /// Notifies that a transaction has been added to the mempool. pub async fn on_transaction_added_to_mempool(&self) { - // TODO: call a handler function or a predefined registered service + let network_mempool_size = self.mining_manager().clone().transaction_count(TransactionQuery::TransactionsOnly).await as u64; + + let _ = + self.notification_root.notify(Notification::MempoolSizeChanged(MempoolSizeChangedNotification::new(network_mempool_size))); } /// Adds the rpc-submitted transaction to the mempool and propagates it to peers. @@ -673,6 +678,9 @@ impl FlowContext { false, // RPC transactions are considered high priority, so we don't want to throttle them ) .await; + + self.on_transaction_added_to_mempool().await; + Ok(()) } @@ -698,6 +706,9 @@ impl FlowContext { false, // RPC transactions are considered high priority, so we don't want to throttle them ) .await; + + self.on_transaction_added_to_mempool().await; + // The combination of args above of Orphan::Forbidden and RbfPolicy::Mandatory should always result // in a removed transaction returned, however we prefer failing gracefully in case of future internal mempool changes transaction_insertion.removed.ok_or(ProtocolError::Other( diff --git a/rpc/core/src/api/notifications.rs b/rpc/core/src/api/notifications.rs index 503af0de85..c6887b9133 100644 --- a/rpc/core/src/api/notifications.rs +++ b/rpc/core/src/api/notifications.rs @@ -48,6 +48,9 @@ pub enum Notification { #[display(fmt = "NewBlockTemplate notification")] NewBlockTemplate(NewBlockTemplateNotification), + + #[display(fmt = "MempoolSizeChanged notification: size {}", "_0.network_mempool_size")] + MempoolSizeChanged(MempoolSizeChangedNotification), } } @@ -64,6 +67,7 @@ impl Notification { Notification::VirtualDaaScoreChanged(v) => to_value(&v), Notification::SinkBlueScoreChanged(v) => to_value(&v), Notification::VirtualChainChanged(v) => to_value(&v), + Notification::MempoolSizeChanged(v) => to_value(&v), } } } @@ -157,6 +161,10 @@ impl Serializer for Notification { store!(u16, &8, writer)?; serialize!(NewBlockTemplateNotification, notification, writer)?; } + Notification::MempoolSizeChanged(notification) => { + store!(u16, &9, writer)?; + serialize!(MempoolSizeChangedNotification, notification, writer)?; + } } Ok(()) } @@ -202,6 +210,10 @@ impl Deserializer for Notification { let notification = deserialize!(NewBlockTemplateNotification, reader)?; Ok(Notification::NewBlockTemplate(notification)) } + 9 => { + let notification = deserialize!(MempoolSizeChangedNotification, reader)?; + Ok(Notification::MempoolSizeChanged(notification)) + } _ => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid variant")), } } diff --git a/rpc/core/src/api/ops.rs b/rpc/core/src/api/ops.rs index 4541ddc56d..f4b4966519 100644 --- a/rpc/core/src/api/ops.rs +++ b/rpc/core/src/api/ops.rs @@ -40,6 +40,7 @@ pub enum RpcApiOps { NotifyVirtualDaaScoreChanged = 16, NotifyVirtualChainChanged = 17, NotifySinkBlueScoreChanged = 18, + NotifyMempoolSizeChanged = 19, // Notification ops required by wRPC @@ -54,6 +55,7 @@ pub enum RpcApiOps { VirtualDaaScoreChangedNotification = 66, PruningPointUtxoSetOverrideNotification = 67, NewBlockTemplateNotification = 68, + MempoolSizeChangedNotification = 69, // RPC methods /// Ping the node to check if connection is alive @@ -153,6 +155,7 @@ impl RpcApiOps { | RpcApiOps::NotifyFinalityConflictResolved | RpcApiOps::NotifySinkBlueScoreChanged | RpcApiOps::NotifyVirtualDaaScoreChanged + | RpcApiOps::NotifyMempoolSizeChanged | RpcApiOps::Subscribe | RpcApiOps::Unsubscribe ) @@ -177,6 +180,7 @@ impl From for RpcApiOps { EventType::UtxosChanged => RpcApiOps::UtxosChangedNotification, EventType::SinkBlueScoreChanged => RpcApiOps::SinkBlueScoreChangedNotification, EventType::VirtualDaaScoreChanged => RpcApiOps::VirtualDaaScoreChangedNotification, + EventType::MempoolSizeChanged => RpcApiOps::MempoolSizeChangedNotification, EventType::PruningPointUtxoSetOverride => RpcApiOps::PruningPointUtxoSetOverrideNotification, EventType::NewBlockTemplate => RpcApiOps::NewBlockTemplateNotification, } diff --git a/rpc/core/src/convert/notification.rs b/rpc/core/src/convert/notification.rs index 6251cc1cdf..386db7fe0f 100644 --- a/rpc/core/src/convert/notification.rs +++ b/rpc/core/src/convert/notification.rs @@ -2,8 +2,9 @@ use crate::{ convert::utxo::utxo_set_into_rpc, BlockAddedNotification, FinalityConflictNotification, FinalityConflictResolvedNotification, - NewBlockTemplateNotification, Notification, PruningPointUtxoSetOverrideNotification, RpcAcceptedTransactionIds, - SinkBlueScoreChangedNotification, UtxosChangedNotification, VirtualChainChangedNotification, VirtualDaaScoreChangedNotification, + MempoolSizeChangedNotification, NewBlockTemplateNotification, Notification, PruningPointUtxoSetOverrideNotification, + RpcAcceptedTransactionIds, SinkBlueScoreChangedNotification, UtxosChangedNotification, VirtualChainChangedNotification, + VirtualDaaScoreChangedNotification, }; use kaspa_consensus_notify::notification as consensus_notify; use kaspa_index_core::notification as index_notify; @@ -31,6 +32,7 @@ impl From<&consensus_notify::Notification> for Notification { consensus_notify::Notification::VirtualDaaScoreChanged(msg) => Notification::VirtualDaaScoreChanged(msg.into()), consensus_notify::Notification::PruningPointUtxoSetOverride(msg) => Notification::PruningPointUtxoSetOverride(msg.into()), consensus_notify::Notification::NewBlockTemplate(msg) => Notification::NewBlockTemplate(msg.into()), + consensus_notify::Notification::MempoolSizeChanged(msg) => Notification::MempoolSizeChanged(msg.into()), } } } @@ -112,6 +114,12 @@ impl From<&consensus_notify::NewBlockTemplateNotification> for NewBlockTemplateN } } +impl From<&consensus_notify::MempoolSizeChangedNotification> for MempoolSizeChangedNotification { + fn from(item: &consensus_notify::MempoolSizeChangedNotification) -> Self { + Self { network_mempool_size: item.network_mempool_size } + } +} + // ---------------------------------------------------------------------------- // index to rpc_core // ---------------------------------------------------------------------------- diff --git a/rpc/core/src/convert/scope.rs b/rpc/core/src/convert/scope.rs index 6d94de326f..70af4816df 100644 --- a/rpc/core/src/convert/scope.rs +++ b/rpc/core/src/convert/scope.rs @@ -1,9 +1,9 @@ //! Conversion of Notification Scope related types use crate::{ - NotifyBlockAddedRequest, NotifyFinalityConflictRequest, NotifyNewBlockTemplateRequest, NotifyPruningPointUtxoSetOverrideRequest, - NotifySinkBlueScoreChangedRequest, NotifyUtxosChangedRequest, NotifyVirtualChainChangedRequest, - NotifyVirtualDaaScoreChangedRequest, + NotifyBlockAddedRequest, NotifyFinalityConflictRequest, NotifyMempoolSizeChangedRequest, NotifyNewBlockTemplateRequest, + NotifyPruningPointUtxoSetOverrideRequest, NotifySinkBlueScoreChangedRequest, NotifyUtxosChangedRequest, + NotifyVirtualChainChangedRequest, NotifyVirtualDaaScoreChangedRequest, }; use kaspa_notify::scope::*; @@ -62,3 +62,4 @@ from!(SinkBlueScoreChanged); from!(VirtualDaaScoreChanged); from!(PruningPointUtxoSetOverride); from!(NewBlockTemplate); +from!(MempoolSizeChanged); diff --git a/rpc/core/src/model/message.rs b/rpc/core/src/model/message.rs index cb663c394a..40771a7ffd 100644 --- a/rpc/core/src/model/message.rs +++ b/rpc/core/src/model/message.rs @@ -3321,6 +3321,81 @@ impl Deserializer for VirtualDaaScoreChangedNotification { } } +// NotifyMempoolSizeChangedRequest registers this connection for +// mempoolSizeChanged notifications. +// +// See: MempoolSizeChangedNotification +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct NotifyMempoolSizeChangedRequest { + pub command: Command, +} + +impl NotifyMempoolSizeChangedRequest { + pub fn new(command: Command) -> Self { + Self { command } + } +} + +impl Serializer for NotifyMempoolSizeChangedRequest { + fn serialize(&self, writer: &mut W) -> std::io::Result<()> { + store!(u16, &1, writer)?; + store!(Command, &self.command, writer)?; + Ok(()) + } +} + +impl Deserializer for NotifyMempoolSizeChangedRequest { + fn deserialize(reader: &mut R) -> std::io::Result { + let _version = load!(u16, reader)?; + let command = load!(Command, reader)?; + Ok(Self { command }) + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct NotifyMempoolSizeChangedResponse {} + +impl Serializer for NotifyMempoolSizeChangedResponse { + fn serialize(&self, writer: &mut W) -> std::io::Result<()> { + store!(u16, &1, writer)?; + Ok(()) + } +} + +impl Deserializer for NotifyMempoolSizeChangedResponse { + fn deserialize(reader: &mut R) -> std::io::Result { + let _version = load!(u16, reader)?; + Ok(Self {}) + } +} + +// MempoolSizeChangedNotification is sent whenever the mempool changes. +// +// See NotifyMempoolSizeChangedRequest +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct MempoolSizeChangedNotification { + pub network_mempool_size: u64, +} + +impl Serializer for MempoolSizeChangedNotification { + fn serialize(&self, writer: &mut W) -> std::io::Result<()> { + store!(u16, &1, writer)?; + store!(u64, &self.network_mempool_size, writer)?; + Ok(()) + } +} + +impl Deserializer for MempoolSizeChangedNotification { + fn deserialize(reader: &mut R) -> std::io::Result { + let _version = load!(u16, reader)?; + let network_mempool_size = load!(u64, reader)?; + Ok(Self { network_mempool_size }) + } +} + // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // PruningPointUtxoSetOverrideNotification diff --git a/rpc/core/src/model/tests.rs b/rpc/core/src/model/tests.rs index d931f5ac23..9f2d79a21a 100644 --- a/rpc/core/src/model/tests.rs +++ b/rpc/core/src/model/tests.rs @@ -1290,6 +1290,30 @@ mod mockery { } } + impl Mock for NotifyMempoolSizeChangedRequest { + fn mock() -> Self { + NotifyMempoolSizeChangedRequest { command: Command::Start } + } + } + + test!(NotifyMempoolSizeChangedRequest); + + impl Mock for NotifyMempoolSizeChangedResponse { + fn mock() -> Self { + NotifyMempoolSizeChangedResponse {} + } + } + + test!(NotifyMempoolSizeChangedResponse); + + impl Mock for MempoolSizeChangedNotification { + fn mock() -> Self { + MempoolSizeChangedNotification { network_mempool_size: mock() } + } + } + + test!(MempoolSizeChangedNotification); + test!(SubscribeResponse); impl Mock for UnsubscribeResponse { diff --git a/rpc/grpc/core/proto/messages.proto b/rpc/grpc/core/proto/messages.proto index ccb2798b67..3b64d3299d 100644 --- a/rpc/grpc/core/proto/messages.proto +++ b/rpc/grpc/core/proto/messages.proto @@ -66,6 +66,7 @@ message KaspadRequest { GetFeeEstimateExperimentalRequestMessage getFeeEstimateExperimentalRequest = 1108; GetCurrentBlockColorRequestMessage getCurrentBlockColorRequest = 1110; GetUtxoReturnAddressRequestMessage GetUtxoReturnAddressRequest = 1112; + NotifyMempoolSizeChangedRequestMessage notifyMempoolSizeChangedRequest = 1114; } } @@ -132,6 +133,8 @@ message KaspadResponse { GetFeeEstimateExperimentalResponseMessage getFeeEstimateExperimentalResponse = 1109; GetCurrentBlockColorResponseMessage getCurrentBlockColorResponse = 1111; GetUtxoReturnAddressResponseMessage GetUtxoReturnAddressResponse = 1113; + NotifyMempoolSizeChangedResponseMessage notifyMempoolSizeChangedResponse = 1115; + MempoolSizeChangedNotificationMessage mempoolSizeChangedNotification = 1117; } } diff --git a/rpc/grpc/core/proto/rpc.proto b/rpc/grpc/core/proto/rpc.proto index 4c36150ad3..bd31531c5f 100644 --- a/rpc/grpc/core/proto/rpc.proto +++ b/rpc/grpc/core/proto/rpc.proto @@ -636,6 +636,25 @@ message VirtualDaaScoreChangedNotificationMessage { uint64 virtualDaaScore = 1; } +// NotifyMempoolSizeChangedRequestMessage registers this connection for +// mempoolSizeChanged notifications. +// +// See: MempoolSizeChangedNotificationMessage +message NotifyMempoolSizeChangedRequestMessage { + RpcNotifyCommand command = 101; +} + +message NotifyMempoolSizeChangedResponseMessage { + RPCError error = 1000; +} + +// MempoolSizeChangedNotificationMessage is sent whenever the mempool changes. +// +// See NotifyMempoolSizeChangedRequestMessage +message MempoolSizeChangedNotificationMessage { + uint64 networkMempoolSize = 1; +} + // NotifyPruningPointUtxoSetOverrideRequestMessage registers this connection for // pruning point UTXO set override notifications. // diff --git a/rpc/grpc/core/src/convert/kaspad.rs b/rpc/grpc/core/src/convert/kaspad.rs index 7243fd401a..9597827730 100644 --- a/rpc/grpc/core/src/convert/kaspad.rs +++ b/rpc/grpc/core/src/convert/kaspad.rs @@ -73,6 +73,7 @@ pub mod kaspad_request_convert { impl_into_kaspad_request!(NotifyVirtualDaaScoreChanged); impl_into_kaspad_request!(NotifyVirtualChainChanged); impl_into_kaspad_request!(NotifySinkBlueScoreChanged); + impl_into_kaspad_request!(NotifyMempoolSizeChanged); macro_rules! impl_into_kaspad_request { ($name:tt) => { @@ -214,6 +215,7 @@ pub mod kaspad_response_convert { impl_into_kaspad_notify_response!(NotifyUtxosChanged, StopNotifyingUtxosChanged); impl_into_kaspad_notify_response!(NotifyPruningPointUtxoSetOverride, StopNotifyingPruningPointUtxoSetOverride); + impl_into_kaspad_notify_response!(NotifyMempoolSizeChanged); macro_rules! impl_into_kaspad_response { ($name:tt) => { diff --git a/rpc/grpc/core/src/convert/message.rs b/rpc/grpc/core/src/convert/message.rs index c92e824ed0..70b1f5467e 100644 --- a/rpc/grpc/core/src/convert/message.rs +++ b/rpc/grpc/core/src/convert/message.rs @@ -559,6 +559,10 @@ from!(item: &kaspa_rpc_core::NotifySinkBlueScoreChangedRequest, protowire::Notif Self { command: item.command.into() } }); from!(RpcResult<&kaspa_rpc_core::NotifySinkBlueScoreChangedResponse>, protowire::NotifySinkBlueScoreChangedResponseMessage); +from!(item: &kaspa_rpc_core::NotifyMempoolSizeChangedRequest, protowire::NotifyMempoolSizeChangedRequestMessage, { + Self { command: item.command.into() } +}); +from!(RpcResult<&kaspa_rpc_core::NotifyMempoolSizeChangedResponse>, protowire::NotifyMempoolSizeChangedResponseMessage); // ---------------------------------------------------------------------------- // protowire to rpc_core @@ -1060,6 +1064,10 @@ try_from!(item: &protowire::NotifySinkBlueScoreChangedRequestMessage, kaspa_rpc_ Self { command: item.command.into() } }); try_from!(&protowire::NotifySinkBlueScoreChangedResponseMessage, RpcResult); +try_from!(item: &protowire::NotifyMempoolSizeChangedRequestMessage, kaspa_rpc_core::NotifyMempoolSizeChangedRequest, { + Self { command: item.command.into() } +}); +try_from!(&protowire::NotifyMempoolSizeChangedResponseMessage, RpcResult); // ---------------------------------------------------------------------------- // Unit tests diff --git a/rpc/grpc/core/src/convert/notification.rs b/rpc/grpc/core/src/convert/notification.rs index 2f2273af1c..e722cd2497 100644 --- a/rpc/grpc/core/src/convert/notification.rs +++ b/rpc/grpc/core/src/convert/notification.rs @@ -2,12 +2,13 @@ use crate::protowire::{ kaspad_response::Payload, BlockAddedNotificationMessage, KaspadResponse, NewBlockTemplateNotificationMessage, RpcNotifyCommand, }; use crate::protowire::{ - FinalityConflictNotificationMessage, FinalityConflictResolvedNotificationMessage, NotifyPruningPointUtxoSetOverrideRequestMessage, - NotifyPruningPointUtxoSetOverrideResponseMessage, NotifyUtxosChangedRequestMessage, NotifyUtxosChangedResponseMessage, - PruningPointUtxoSetOverrideNotificationMessage, SinkBlueScoreChangedNotificationMessage, - StopNotifyingPruningPointUtxoSetOverrideRequestMessage, StopNotifyingPruningPointUtxoSetOverrideResponseMessage, - StopNotifyingUtxosChangedRequestMessage, StopNotifyingUtxosChangedResponseMessage, UtxosChangedNotificationMessage, - VirtualChainChangedNotificationMessage, VirtualDaaScoreChangedNotificationMessage, + FinalityConflictNotificationMessage, FinalityConflictResolvedNotificationMessage, MempoolSizeChangedNotificationMessage, + NotifyPruningPointUtxoSetOverrideRequestMessage, NotifyPruningPointUtxoSetOverrideResponseMessage, + NotifyUtxosChangedRequestMessage, NotifyUtxosChangedResponseMessage, PruningPointUtxoSetOverrideNotificationMessage, + SinkBlueScoreChangedNotificationMessage, StopNotifyingPruningPointUtxoSetOverrideRequestMessage, + StopNotifyingPruningPointUtxoSetOverrideResponseMessage, StopNotifyingUtxosChangedRequestMessage, + StopNotifyingUtxosChangedResponseMessage, UtxosChangedNotificationMessage, VirtualChainChangedNotificationMessage, + VirtualDaaScoreChangedNotificationMessage, }; use crate::{from, try_from}; use kaspa_notify::subscription::Command; @@ -31,6 +32,9 @@ from!(item: &kaspa_rpc_core::Notification, Payload, { Notification::UtxosChanged(ref notification) => Payload::UtxosChangedNotification(notification.into()), Notification::SinkBlueScoreChanged(ref notification) => Payload::SinkBlueScoreChangedNotification(notification.into()), Notification::VirtualDaaScoreChanged(ref notification) => Payload::VirtualDaaScoreChangedNotification(notification.into()), + Notification::MempoolSizeChanged(ref notification) => { + Payload::MempoolSizeChangedNotification(notification.into()) + } Notification::PruningPointUtxoSetOverride(ref notification) => { Payload::PruningPointUtxoSetOverrideNotification(notification.into()) } @@ -72,6 +76,10 @@ from!(item: &kaspa_rpc_core::VirtualDaaScoreChangedNotification, VirtualDaaScore Self { virtual_daa_score: item.virtual_daa_score } }); +from!(item: &kaspa_rpc_core::MempoolSizeChangedNotification, MempoolSizeChangedNotificationMessage, { + Self { network_mempool_size: item.network_mempool_size } +}); + from!(&kaspa_rpc_core::PruningPointUtxoSetOverrideNotification, PruningPointUtxoSetOverrideNotificationMessage); from!(item: Command, RpcNotifyCommand, { @@ -117,6 +125,9 @@ try_from!(item: &Payload, kaspa_rpc_core::Notification, { Payload::PruningPointUtxoSetOverrideNotification(ref notification) => { Notification::PruningPointUtxoSetOverride(notification.try_into()?) } + Payload::MempoolSizeChangedNotification(ref notification) => { + Notification::MempoolSizeChanged(notification.try_into()?) + }, _ => Err(RpcError::UnsupportedFeature)?, } }); @@ -169,6 +180,10 @@ try_from!(item: &VirtualDaaScoreChangedNotificationMessage, kaspa_rpc_core::Virt Self { virtual_daa_score: item.virtual_daa_score } }); +try_from!(item: &MempoolSizeChangedNotificationMessage, kaspa_rpc_core::MempoolSizeChangedNotification, { + Self { network_mempool_size: item.network_mempool_size } +}); + try_from!(&PruningPointUtxoSetOverrideNotificationMessage, kaspa_rpc_core::PruningPointUtxoSetOverrideNotification); from!(item: RpcNotifyCommand, Command, { diff --git a/rpc/grpc/core/src/ext/kaspad.rs b/rpc/grpc/core/src/ext/kaspad.rs index a91ae4c2d7..8489056b8e 100644 --- a/rpc/grpc/core/src/ext/kaspad.rs +++ b/rpc/grpc/core/src/ext/kaspad.rs @@ -2,9 +2,9 @@ use kaspa_notify::{scope::Scope, subscription::Command}; use crate::protowire::{ kaspad_request, kaspad_response, KaspadRequest, KaspadResponse, NotifyBlockAddedRequestMessage, - NotifyFinalityConflictRequestMessage, NotifyNewBlockTemplateRequestMessage, NotifyPruningPointUtxoSetOverrideRequestMessage, - NotifySinkBlueScoreChangedRequestMessage, NotifyUtxosChangedRequestMessage, NotifyVirtualChainChangedRequestMessage, - NotifyVirtualDaaScoreChangedRequestMessage, + NotifyFinalityConflictRequestMessage, NotifyMempoolSizeChangedRequestMessage, NotifyNewBlockTemplateRequestMessage, + NotifyPruningPointUtxoSetOverrideRequestMessage, NotifySinkBlueScoreChangedRequestMessage, NotifyUtxosChangedRequestMessage, + NotifyVirtualChainChangedRequestMessage, NotifyVirtualDaaScoreChangedRequestMessage, }; impl KaspadRequest { @@ -64,6 +64,11 @@ impl kaspad_request::Payload { command: command.into(), }) } + Scope::MempoolSizeChanged(_) => { + kaspad_request::Payload::NotifyMempoolSizeChangedRequest(NotifyMempoolSizeChangedRequestMessage { + command: command.into(), + }) + } } } @@ -79,6 +84,7 @@ impl kaspad_request::Payload { | Payload::NotifyVirtualDaaScoreChangedRequest(_) | Payload::NotifyPruningPointUtxoSetOverrideRequest(_) | Payload::NotifyNewBlockTemplateRequest(_) + | Payload::NotifyMempoolSizeChangedRequest(_) | Payload::StopNotifyingUtxosChangedRequest(_) | Payload::StopNotifyingPruningPointUtxoSetOverrideRequest(_) ) @@ -108,6 +114,7 @@ impl kaspad_response::Payload { Payload::VirtualDaaScoreChangedNotification(_) => true, Payload::PruningPointUtxoSetOverrideNotification(_) => true, Payload::NewBlockTemplateNotification(_) => true, + Payload::MempoolSizeChangedNotification(_) => true, _ => false, } } diff --git a/rpc/grpc/core/src/ops.rs b/rpc/grpc/core/src/ops.rs index 223774c74c..c95ae8a46c 100644 --- a/rpc/grpc/core/src/ops.rs +++ b/rpc/grpc/core/src/ops.rs @@ -98,6 +98,7 @@ pub enum KaspadPayloadOps { NotifyPruningPointUtxoSetOverride, NotifyVirtualDaaScoreChanged, NotifyVirtualChainChanged, + NotifyMempoolSizeChanged, // Legacy stop subscription commands StopNotifyingUtxosChanged, diff --git a/rpc/grpc/server/src/request_handler/factory.rs b/rpc/grpc/server/src/request_handler/factory.rs index 9fec86e476..7a30a62f61 100644 --- a/rpc/grpc/server/src/request_handler/factory.rs +++ b/rpc/grpc/server/src/request_handler/factory.rs @@ -90,6 +90,7 @@ impl Factory { NotifyPruningPointUtxoSetOverride, NotifyVirtualDaaScoreChanged, NotifyVirtualChainChanged, + NotifyMempoolSizeChanged, StopNotifyingUtxosChanged, StopNotifyingPruningPointUtxoSetOverride, ] diff --git a/rpc/wrpc/client/src/client.rs b/rpc/wrpc/client/src/client.rs index f22bcf6255..2ddb9ce212 100644 --- a/rpc/wrpc/client/src/client.rs +++ b/rpc/wrpc/client/src/client.rs @@ -76,6 +76,7 @@ impl Inner { RpcApiOps::VirtualDaaScoreChangedNotification, RpcApiOps::PruningPointUtxoSetOverrideNotification, RpcApiOps::NewBlockTemplateNotification, + RpcApiOps::MempoolSizeChangedNotification, ] .into_iter() .for_each(|notification_op| { diff --git a/testing/integration/src/rpc_tests.rs b/testing/integration/src/rpc_tests.rs index 3ca619423a..dbce10312f 100644 --- a/testing/integration/src/rpc_tests.rs +++ b/testing/integration/src/rpc_tests.rs @@ -11,8 +11,8 @@ use kaspa_hashes::Hash; use kaspa_notify::{ connection::{ChannelConnection, ChannelType}, scope::{ - BlockAddedScope, FinalityConflictScope, NewBlockTemplateScope, PruningPointUtxoSetOverrideScope, Scope, - SinkBlueScoreChangedScope, UtxosChangedScope, VirtualChainChangedScope, VirtualDaaScoreChangedScope, + BlockAddedScope, FinalityConflictScope, MempoolSizeChangedScope, NewBlockTemplateScope, PruningPointUtxoSetOverrideScope, + Scope, SinkBlueScoreChangedScope, UtxosChangedScope, VirtualChainChangedScope, VirtualDaaScoreChangedScope, }, }; use kaspa_rpc_core::{api::rpc::RpcApi, model::*, Notification}; @@ -734,6 +734,13 @@ async fn sanity_test() { .unwrap(); }) } + KaspadPayloadOps::NotifyMempoolSizeChanged => { + let rpc_client = client.clone(); + let id = listener_id; + tst!(op, { + rpc_client.start_notify(id, MempoolSizeChangedScope {}.into()).await.unwrap(); + }) + } KaspadPayloadOps::StopNotifyingUtxosChanged => { let rpc_client = client.clone(); let id = listener_id;