Skip to content

expanded rpc api to add MempoolSizeChanged #663

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions consensus/notify/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ pub enum Notification {

#[display(fmt = "NewBlockTemplate notification")]
NewBlockTemplate(NewBlockTemplateNotification),

#[display(fmt = "MempoolSizeChanged notification: size {}", "_0.network_mempool_size")]
MempoolSizeChanged(MempoolSizeChangedNotification),
}
}

Expand Down Expand Up @@ -181,3 +184,14 @@ pub struct PruningPointUtxoSetOverrideNotification {}

#[derive(Debug, Clone)]
pub struct NewBlockTemplateNotification {}

#[derive(Debug, Clone)]
pub struct MempoolSizeChangedNotification {
pub network_mempool_size: u64,
}

impl MempoolSizeChangedNotification {
pub fn new(network_mempool_size: u64) -> Self {
Self { network_mempool_size }
}
}
4 changes: 3 additions & 1 deletion notify/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ event_type_enum! {
VirtualDaaScoreChanged,
PruningPointUtxoSetOverride,
NewBlockTemplate,
MempoolSizeChanged,
}
}

pub const EVENT_COUNT: usize = 9;
pub const EVENT_COUNT: usize = 10;

impl FromStr for EventType {
type Err = Error;
Expand All @@ -70,6 +71,7 @@ impl FromStr for EventType {
"virtual-daa-score-changed" => Ok(EventType::VirtualDaaScoreChanged),
"pruning-point-utxo-set-override" => Ok(EventType::PruningPointUtxoSetOverride),
"new-block-template" => Ok(EventType::NewBlockTemplate),
"mempool-size-changed" => Ok(EventType::MempoolSizeChanged),
_ => Err(Error::InvalidEventType(s.to_string())),
}
}
Expand Down
18 changes: 18 additions & 0 deletions notify/src/scope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub enum Scope {
VirtualDaaScoreChanged,
PruningPointUtxoSetOverride,
NewBlockTemplate,
MempoolSizeChanged,
}
}

Expand Down Expand Up @@ -266,3 +267,20 @@ impl Deserializer for NewBlockTemplateScope {
Ok(Self {})
}
}

#[derive(Clone, Display, Debug, Default, PartialEq, Eq, Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
pub struct MempoolSizeChangedScope {}

impl Serializer for MempoolSizeChangedScope {
fn serialize<W: std::io::Write>(&self, writer: &mut W) -> std::io::Result<()> {
store!(u16, &1, writer)?;
Ok(())
}
}

impl Deserializer for MempoolSizeChangedScope {
fn deserialize<R: std::io::Read>(reader: &mut R) -> std::io::Result<Self> {
let _version = load!(u16, reader)?;
Ok(Self {})
}
}
15 changes: 13 additions & 2 deletions protocol/flows/src/flow_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use kaspa_consensus_core::config::Config;
use kaspa_consensus_core::errors::block::RuleError;
use kaspa_consensus_core::tx::{Transaction, TransactionId};
use kaspa_consensus_notify::{
notification::{Notification, PruningPointUtxoSetOverrideNotification},
notification::{MempoolSizeChangedNotification, Notification, PruningPointUtxoSetOverrideNotification},
root::ConsensusNotificationRoot,
};
use kaspa_consensusmanager::{BlockProcessingBatch, ConsensusInstance, ConsensusManager, ConsensusProxy, ConsensusSessionOwned};
Expand All @@ -26,6 +26,7 @@ use kaspa_core::{
use kaspa_core::{time::unix_now, warn};
use kaspa_hashes::Hash;
use kaspa_mining::mempool::tx::{Orphan, Priority};
use kaspa_mining::model::tx_query::TransactionQuery;
use kaspa_mining::{manager::MiningManagerProxy, mempool::tx::RbfPolicy};
use kaspa_notify::notifier::Notify;
use kaspa_p2p_lib::{
Expand Down Expand Up @@ -624,6 +625,7 @@ impl FlowContext {
.await;
}
}
context.on_transaction_added_to_mempool().await;
context.mempool_scanning_is_done().await;
debug!("<> Mempool scanning task is done");
});
Expand All @@ -649,7 +651,10 @@ impl FlowContext {

/// Notifies that a transaction has been added to the mempool.
pub async fn on_transaction_added_to_mempool(&self) {
// TODO: call a handler function or a predefined registered service
let network_mempool_size = self.mining_manager().clone().transaction_count(TransactionQuery::TransactionsOnly).await as u64;

let _ =
self.notification_root.notify(Notification::MempoolSizeChanged(MempoolSizeChangedNotification::new(network_mempool_size)));
}

/// Adds the rpc-submitted transaction to the mempool and propagates it to peers.
Expand All @@ -673,6 +678,9 @@ impl FlowContext {
false, // RPC transactions are considered high priority, so we don't want to throttle them
)
.await;

self.on_transaction_added_to_mempool().await;

Ok(())
}

Expand All @@ -698,6 +706,9 @@ impl FlowContext {
false, // RPC transactions are considered high priority, so we don't want to throttle them
)
.await;

self.on_transaction_added_to_mempool().await;

// The combination of args above of Orphan::Forbidden and RbfPolicy::Mandatory should always result
// in a removed transaction returned, however we prefer failing gracefully in case of future internal mempool changes
transaction_insertion.removed.ok_or(ProtocolError::Other(
Expand Down
12 changes: 12 additions & 0 deletions rpc/core/src/api/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ pub enum Notification {

#[display(fmt = "NewBlockTemplate notification")]
NewBlockTemplate(NewBlockTemplateNotification),

#[display(fmt = "MempoolSizeChanged notification: size {}", "_0.network_mempool_size")]
MempoolSizeChanged(MempoolSizeChangedNotification),
}
}

Expand All @@ -64,6 +67,7 @@ impl Notification {
Notification::VirtualDaaScoreChanged(v) => to_value(&v),
Notification::SinkBlueScoreChanged(v) => to_value(&v),
Notification::VirtualChainChanged(v) => to_value(&v),
Notification::MempoolSizeChanged(v) => to_value(&v),
}
}
}
Expand Down Expand Up @@ -157,6 +161,10 @@ impl Serializer for Notification {
store!(u16, &8, writer)?;
serialize!(NewBlockTemplateNotification, notification, writer)?;
}
Notification::MempoolSizeChanged(notification) => {
store!(u16, &9, writer)?;
serialize!(MempoolSizeChangedNotification, notification, writer)?;
}
}
Ok(())
}
Expand Down Expand Up @@ -202,6 +210,10 @@ impl Deserializer for Notification {
let notification = deserialize!(NewBlockTemplateNotification, reader)?;
Ok(Notification::NewBlockTemplate(notification))
}
9 => {
let notification = deserialize!(MempoolSizeChangedNotification, reader)?;
Ok(Notification::MempoolSizeChanged(notification))
}
_ => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid variant")),
}
}
Expand Down
4 changes: 4 additions & 0 deletions rpc/core/src/api/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub enum RpcApiOps {
NotifyVirtualDaaScoreChanged = 16,
NotifyVirtualChainChanged = 17,
NotifySinkBlueScoreChanged = 18,
NotifyMempoolSizeChanged = 19,

// Notification ops required by wRPC

Expand All @@ -54,6 +55,7 @@ pub enum RpcApiOps {
VirtualDaaScoreChangedNotification = 66,
PruningPointUtxoSetOverrideNotification = 67,
NewBlockTemplateNotification = 68,
MempoolSizeChangedNotification = 69,

// RPC methods
/// Ping the node to check if connection is alive
Expand Down Expand Up @@ -153,6 +155,7 @@ impl RpcApiOps {
| RpcApiOps::NotifyFinalityConflictResolved
| RpcApiOps::NotifySinkBlueScoreChanged
| RpcApiOps::NotifyVirtualDaaScoreChanged
| RpcApiOps::NotifyMempoolSizeChanged
| RpcApiOps::Subscribe
| RpcApiOps::Unsubscribe
)
Expand All @@ -177,6 +180,7 @@ impl From<EventType> for RpcApiOps {
EventType::UtxosChanged => RpcApiOps::UtxosChangedNotification,
EventType::SinkBlueScoreChanged => RpcApiOps::SinkBlueScoreChangedNotification,
EventType::VirtualDaaScoreChanged => RpcApiOps::VirtualDaaScoreChangedNotification,
EventType::MempoolSizeChanged => RpcApiOps::MempoolSizeChangedNotification,
EventType::PruningPointUtxoSetOverride => RpcApiOps::PruningPointUtxoSetOverrideNotification,
EventType::NewBlockTemplate => RpcApiOps::NewBlockTemplateNotification,
}
Expand Down
12 changes: 10 additions & 2 deletions rpc/core/src/convert/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

use crate::{
convert::utxo::utxo_set_into_rpc, BlockAddedNotification, FinalityConflictNotification, FinalityConflictResolvedNotification,
NewBlockTemplateNotification, Notification, PruningPointUtxoSetOverrideNotification, RpcAcceptedTransactionIds,
SinkBlueScoreChangedNotification, UtxosChangedNotification, VirtualChainChangedNotification, VirtualDaaScoreChangedNotification,
MempoolSizeChangedNotification, NewBlockTemplateNotification, Notification, PruningPointUtxoSetOverrideNotification,
RpcAcceptedTransactionIds, SinkBlueScoreChangedNotification, UtxosChangedNotification, VirtualChainChangedNotification,
VirtualDaaScoreChangedNotification,
};
use kaspa_consensus_notify::notification as consensus_notify;
use kaspa_index_core::notification as index_notify;
Expand Down Expand Up @@ -31,6 +32,7 @@ impl From<&consensus_notify::Notification> for Notification {
consensus_notify::Notification::VirtualDaaScoreChanged(msg) => Notification::VirtualDaaScoreChanged(msg.into()),
consensus_notify::Notification::PruningPointUtxoSetOverride(msg) => Notification::PruningPointUtxoSetOverride(msg.into()),
consensus_notify::Notification::NewBlockTemplate(msg) => Notification::NewBlockTemplate(msg.into()),
consensus_notify::Notification::MempoolSizeChanged(msg) => Notification::MempoolSizeChanged(msg.into()),
}
}
}
Expand Down Expand Up @@ -112,6 +114,12 @@ impl From<&consensus_notify::NewBlockTemplateNotification> for NewBlockTemplateN
}
}

impl From<&consensus_notify::MempoolSizeChangedNotification> for MempoolSizeChangedNotification {
fn from(item: &consensus_notify::MempoolSizeChangedNotification) -> Self {
Self { network_mempool_size: item.network_mempool_size }
}
}

// ----------------------------------------------------------------------------
// index to rpc_core
// ----------------------------------------------------------------------------
Expand Down
7 changes: 4 additions & 3 deletions rpc/core/src/convert/scope.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! Conversion of Notification Scope related types

use crate::{
NotifyBlockAddedRequest, NotifyFinalityConflictRequest, NotifyNewBlockTemplateRequest, NotifyPruningPointUtxoSetOverrideRequest,
NotifySinkBlueScoreChangedRequest, NotifyUtxosChangedRequest, NotifyVirtualChainChangedRequest,
NotifyVirtualDaaScoreChangedRequest,
NotifyBlockAddedRequest, NotifyFinalityConflictRequest, NotifyMempoolSizeChangedRequest, NotifyNewBlockTemplateRequest,
NotifyPruningPointUtxoSetOverrideRequest, NotifySinkBlueScoreChangedRequest, NotifyUtxosChangedRequest,
NotifyVirtualChainChangedRequest, NotifyVirtualDaaScoreChangedRequest,
};
use kaspa_notify::scope::*;

Expand Down Expand Up @@ -62,3 +62,4 @@ from!(SinkBlueScoreChanged);
from!(VirtualDaaScoreChanged);
from!(PruningPointUtxoSetOverride);
from!(NewBlockTemplate);
from!(MempoolSizeChanged);
75 changes: 75 additions & 0 deletions rpc/core/src/model/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3321,6 +3321,81 @@ impl Deserializer for VirtualDaaScoreChangedNotification {
}
}

// NotifyMempoolSizeChangedRequest registers this connection for
// mempoolSizeChanged notifications.
//
// See: MempoolSizeChangedNotification
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct NotifyMempoolSizeChangedRequest {
pub command: Command,
}

impl NotifyMempoolSizeChangedRequest {
pub fn new(command: Command) -> Self {
Self { command }
}
}

impl Serializer for NotifyMempoolSizeChangedRequest {
fn serialize<W: std::io::Write>(&self, writer: &mut W) -> std::io::Result<()> {
store!(u16, &1, writer)?;
store!(Command, &self.command, writer)?;
Ok(())
}
}

impl Deserializer for NotifyMempoolSizeChangedRequest {
fn deserialize<R: std::io::Read>(reader: &mut R) -> std::io::Result<Self> {
let _version = load!(u16, reader)?;
let command = load!(Command, reader)?;
Ok(Self { command })
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct NotifyMempoolSizeChangedResponse {}

impl Serializer for NotifyMempoolSizeChangedResponse {
fn serialize<W: std::io::Write>(&self, writer: &mut W) -> std::io::Result<()> {
store!(u16, &1, writer)?;
Ok(())
}
}

impl Deserializer for NotifyMempoolSizeChangedResponse {
fn deserialize<R: std::io::Read>(reader: &mut R) -> std::io::Result<Self> {
let _version = load!(u16, reader)?;
Ok(Self {})
}
}

// MempoolSizeChangedNotification is sent whenever the mempool changes.
//
// See NotifyMempoolSizeChangedRequest
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct MempoolSizeChangedNotification {
pub network_mempool_size: u64,
}

impl Serializer for MempoolSizeChangedNotification {
fn serialize<W: std::io::Write>(&self, writer: &mut W) -> std::io::Result<()> {
store!(u16, &1, writer)?;
store!(u64, &self.network_mempool_size, writer)?;
Ok(())
}
}

impl Deserializer for MempoolSizeChangedNotification {
fn deserialize<R: std::io::Read>(reader: &mut R) -> std::io::Result<Self> {
let _version = load!(u16, reader)?;
let network_mempool_size = load!(u64, reader)?;
Ok(Self { network_mempool_size })
}
}

// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// PruningPointUtxoSetOverrideNotification

Expand Down
24 changes: 24 additions & 0 deletions rpc/core/src/model/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1290,6 +1290,30 @@ mod mockery {
}
}

impl Mock for NotifyMempoolSizeChangedRequest {
fn mock() -> Self {
NotifyMempoolSizeChangedRequest { command: Command::Start }
}
}

test!(NotifyMempoolSizeChangedRequest);

impl Mock for NotifyMempoolSizeChangedResponse {
fn mock() -> Self {
NotifyMempoolSizeChangedResponse {}
}
}

test!(NotifyMempoolSizeChangedResponse);

impl Mock for MempoolSizeChangedNotification {
fn mock() -> Self {
MempoolSizeChangedNotification { network_mempool_size: mock() }
}
}

test!(MempoolSizeChangedNotification);

test!(SubscribeResponse);

impl Mock for UnsubscribeResponse {
Expand Down
3 changes: 3 additions & 0 deletions rpc/grpc/core/proto/messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ message KaspadRequest {
GetFeeEstimateExperimentalRequestMessage getFeeEstimateExperimentalRequest = 1108;
GetCurrentBlockColorRequestMessage getCurrentBlockColorRequest = 1110;
GetUtxoReturnAddressRequestMessage GetUtxoReturnAddressRequest = 1112;
NotifyMempoolSizeChangedRequestMessage notifyMempoolSizeChangedRequest = 1114;
}
}

Expand Down Expand Up @@ -132,6 +133,8 @@ message KaspadResponse {
GetFeeEstimateExperimentalResponseMessage getFeeEstimateExperimentalResponse = 1109;
GetCurrentBlockColorResponseMessage getCurrentBlockColorResponse = 1111;
GetUtxoReturnAddressResponseMessage GetUtxoReturnAddressResponse = 1113;
NotifyMempoolSizeChangedResponseMessage notifyMempoolSizeChangedResponse = 1115;
MempoolSizeChangedNotificationMessage mempoolSizeChangedNotification = 1117;
}
}

Expand Down
Loading