From 107de2713fde59ef0f0c66657480146082d5f30a Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 8 Jan 2025 11:33:19 +0100 Subject: [PATCH 1/2] Introduce `MessageQueueNotifierGuard` type Previously, when enqueuing new messages to the `MessageQueue`, we'd directly notify the BP to handle the messages, potentially causing multiple wake-ups in short succession, and risking that we'd reenter with crucial locks still held. Here, we instead introduce a `MessageQueueNotifierGuard` type that parallels our recently-introduced `EventQueueNotifierGuard`, buffers the messages, and will only append them to the message queue and notify the BP when dropped. This will allow us to remove a lot of error-prone boilerplate in the next step. --- lightning-liquidity/src/lsps0/client.rs | 4 ++- lightning-liquidity/src/lsps0/service.rs | 4 ++- lightning-liquidity/src/lsps1/client.rs | 12 +++++-- lightning-liquidity/src/lsps1/service.rs | 15 +++++--- lightning-liquidity/src/lsps2/client.rs | 8 +++-- lightning-liquidity/src/lsps2/service.rs | 46 +++++++++++++++--------- lightning-liquidity/src/lsps5/client.rs | 9 +++-- lightning-liquidity/src/lsps5/service.rs | 18 ++++++---- lightning-liquidity/src/manager.rs | 4 ++- lightning-liquidity/src/message_queue.rs | 28 ++++++++++++--- 10 files changed, 106 insertions(+), 42 deletions(-) diff --git a/lightning-liquidity/src/lsps0/client.rs b/lightning-liquidity/src/lsps0/client.rs index 5ae73005e61..f7e01b323f3 100644 --- a/lightning-liquidity/src/lsps0/client.rs +++ b/lightning-liquidity/src/lsps0/client.rs @@ -50,12 +50,14 @@ where /// specifcation](https://github.com/lightning/blips/blob/master/blip-0050.md#lsps-specification-support-query) /// for more information. pub fn list_protocols(&self, counterparty_node_id: &PublicKey) { + let mut message_queue_notifier = self.pending_messages.notifier(); + let msg = LSPS0Message::Request( utils::generate_request_id(&self.entropy_source), LSPS0Request::ListProtocols(LSPS0ListProtocolsRequest {}), ); - self.pending_messages.enqueue(counterparty_node_id, msg.into()); + message_queue_notifier.enqueue(counterparty_node_id, msg.into()); } fn handle_response( diff --git a/lightning-liquidity/src/lsps0/service.rs b/lightning-liquidity/src/lsps0/service.rs index 4a595ab3d2f..2b4e6782ce8 100644 --- a/lightning-liquidity/src/lsps0/service.rs +++ b/lightning-liquidity/src/lsps0/service.rs @@ -40,6 +40,8 @@ impl LSPS0ServiceHandler { fn handle_request( &self, request_id: LSPSRequestId, request: LSPS0Request, counterparty_node_id: &PublicKey, ) -> Result<(), lightning::ln::msgs::LightningError> { + let mut message_queue_notifier = self.pending_messages.notifier(); + match request { LSPS0Request::ListProtocols(_) => { let msg = LSPS0Message::Response( @@ -48,7 +50,7 @@ impl LSPS0ServiceHandler { protocols: self.protocols.clone(), }), ); - self.pending_messages.enqueue(counterparty_node_id, msg.into()); + message_queue_notifier.enqueue(counterparty_node_id, msg.into()); Ok(()) }, } diff --git a/lightning-liquidity/src/lsps1/client.rs b/lightning-liquidity/src/lsps1/client.rs index b1b7b6a2493..fb9ea2ef8f3 100644 --- a/lightning-liquidity/src/lsps1/client.rs +++ b/lightning-liquidity/src/lsps1/client.rs @@ -90,6 +90,8 @@ where /// /// [`SupportedOptionsReady`]: crate::lsps1::event::LSPS1ClientEvent::SupportedOptionsReady pub fn request_supported_options(&self, counterparty_node_id: PublicKey) -> LSPSRequestId { + let mut message_queue_notifier = self.pending_messages.notifier(); + let request_id = crate::utils::generate_request_id(&self.entropy_source); { let mut outer_state_lock = self.per_peer_state.write().unwrap(); @@ -102,7 +104,7 @@ where let request = LSPS1Request::GetInfo(LSPS1GetInfoRequest {}); let msg = LSPS1Message::Request(request_id.clone(), request).into(); - self.pending_messages.enqueue(&counterparty_node_id, msg); + message_queue_notifier.enqueue(&counterparty_node_id, msg); request_id } @@ -198,6 +200,8 @@ where &self, counterparty_node_id: &PublicKey, order: LSPS1OrderParams, refund_onchain_address: Option
, ) -> LSPSRequestId { + let mut message_queue_notifier = self.pending_messages.notifier(); + let (request_id, request_msg) = { let mut outer_state_lock = self.per_peer_state.write().unwrap(); let inner_state_lock = outer_state_lock @@ -217,7 +221,7 @@ where }; if let Some(msg) = request_msg { - self.pending_messages.enqueue(&counterparty_node_id, msg); + message_queue_notifier.enqueue(&counterparty_node_id, msg); } request_id @@ -322,6 +326,8 @@ where pub fn check_order_status( &self, counterparty_node_id: &PublicKey, order_id: LSPS1OrderId, ) -> LSPSRequestId { + let mut message_queue_notifier = self.pending_messages.notifier(); + let (request_id, request_msg) = { let mut outer_state_lock = self.per_peer_state.write().unwrap(); let inner_state_lock = outer_state_lock @@ -340,7 +346,7 @@ where }; if let Some(msg) = request_msg { - self.pending_messages.enqueue(&counterparty_node_id, msg); + message_queue_notifier.enqueue(&counterparty_node_id, msg); } request_id diff --git a/lightning-liquidity/src/lsps1/service.rs b/lightning-liquidity/src/lsps1/service.rs index 28fe72ca905..a7d856eb64c 100644 --- a/lightning-liquidity/src/lsps1/service.rs +++ b/lightning-liquidity/src/lsps1/service.rs @@ -177,6 +177,8 @@ where fn handle_get_info_request( &self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey, ) -> Result<(), LightningError> { + let mut message_queue_notifier = self.pending_messages.notifier(); + let response = LSPS1Response::GetInfo(LSPS1GetInfoResponse { options: self .config @@ -190,7 +192,7 @@ where }); let msg = LSPS1Message::Response(request_id, response).into(); - self.pending_messages.enqueue(counterparty_node_id, msg); + message_queue_notifier.enqueue(counterparty_node_id, msg); Ok(()) } @@ -198,7 +200,9 @@ where &self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey, params: LSPS1CreateOrderRequest, ) -> Result<(), LightningError> { + let mut message_queue_notifier = self.pending_messages.notifier(); let event_queue_notifier = self.pending_events.notifier(); + if !is_valid(¶ms.order, &self.config.supported_options.as_ref().unwrap()) { let response = LSPS1Response::CreateOrderError(LSPSResponseError { code: LSPS1_CREATE_ORDER_REQUEST_ORDER_MISMATCH_ERROR_CODE, @@ -209,7 +213,7 @@ where )), }); let msg = LSPS1Message::Response(request_id, response).into(); - self.pending_messages.enqueue(counterparty_node_id, msg); + message_queue_notifier.enqueue(counterparty_node_id, msg); return Err(LightningError { err: format!( "Client order does not match any supported options: {:?}", @@ -250,6 +254,7 @@ where &self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey, payment: LSPS1PaymentInfo, created_at: LSPSDateTime, ) -> Result<(), APIError> { + let mut message_queue_notifier = self.pending_messages.notifier(); let (result, response) = { let outer_state_lock = self.per_peer_state.read().unwrap(); @@ -306,7 +311,7 @@ where if let Some(response) = response { let msg = LSPS1Message::Response(request_id, response).into(); - self.pending_messages.enqueue(counterparty_node_id, msg); + message_queue_notifier.enqueue(counterparty_node_id, msg); } result @@ -376,6 +381,8 @@ where &self, request_id: LSPSRequestId, counterparty_node_id: PublicKey, order_id: LSPS1OrderId, order_state: LSPS1OrderState, channel: Option, ) -> Result<(), APIError> { + let mut message_queue_notifier = self.pending_messages.notifier(); + let (result, response) = { let outer_state_lock = self.per_peer_state.read().unwrap(); @@ -420,7 +427,7 @@ where if let Some(response) = response { let msg = LSPS1Message::Response(request_id, response).into(); - self.pending_messages.enqueue(&counterparty_node_id, msg); + message_queue_notifier.enqueue(&counterparty_node_id, msg); } result diff --git a/lightning-liquidity/src/lsps2/client.rs b/lightning-liquidity/src/lsps2/client.rs index bbe313d6089..fa08093108b 100644 --- a/lightning-liquidity/src/lsps2/client.rs +++ b/lightning-liquidity/src/lsps2/client.rs @@ -118,6 +118,8 @@ where pub fn request_opening_params( &self, counterparty_node_id: PublicKey, token: Option, ) -> LSPSRequestId { + let mut message_queue_notifier = self.pending_messages.notifier(); + let request_id = crate::utils::generate_request_id(&self.entropy_source); { @@ -131,7 +133,7 @@ where let request = LSPS2Request::GetInfo(LSPS2GetInfoRequest { token }); let msg = LSPS2Message::Request(request_id.clone(), request).into(); - self.pending_messages.enqueue(&counterparty_node_id, msg); + message_queue_notifier.enqueue(&counterparty_node_id, msg); request_id } @@ -160,6 +162,8 @@ where &self, counterparty_node_id: PublicKey, payment_size_msat: Option, opening_fee_params: LSPS2OpeningFeeParams, ) -> Result { + let mut message_queue_notifier = self.pending_messages.notifier(); + let request_id = crate::utils::generate_request_id(&self.entropy_source); { @@ -184,7 +188,7 @@ where let request = LSPS2Request::Buy(LSPS2BuyRequest { opening_fee_params, payment_size_msat }); let msg = LSPS2Message::Request(request_id.clone(), request).into(); - self.pending_messages.enqueue(&counterparty_node_id, msg); + message_queue_notifier.enqueue(&counterparty_node_id, msg); Ok(request_id) } diff --git a/lightning-liquidity/src/lsps2/service.rs b/lightning-liquidity/src/lsps2/service.rs index e1666b1d352..8053d36c25d 100644 --- a/lightning-liquidity/src/lsps2/service.rs +++ b/lightning-liquidity/src/lsps2/service.rs @@ -499,7 +499,7 @@ impl PeerState { } macro_rules! get_or_insert_peer_state_entry { - ($self: ident, $outer_state_lock: expr, $counterparty_node_id: expr) => {{ + ($self: ident, $outer_state_lock: expr, $message_queue_notifier: expr, $counterparty_node_id: expr) => {{ // Return an internal error and abort if we hit the maximum allowed number of total peers. let is_limited_by_max_total_peers = $outer_state_lock.len() >= MAX_TOTAL_PEERS; match $outer_state_lock.entry(*$counterparty_node_id) { @@ -512,7 +512,7 @@ macro_rules! get_or_insert_peer_state_entry { let msg = LSPSMessage::Invalid(error_response); drop($outer_state_lock); - $self.pending_messages.enqueue($counterparty_node_id, msg); + $message_queue_notifier.enqueue($counterparty_node_id, msg); let err = format!( "Dropping request from peer {} due to reaching maximally allowed number of total peers: {}", @@ -581,6 +581,7 @@ where pub fn invalid_token_provided( &self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId, ) -> Result<(), APIError> { + let mut message_queue_notifier = self.pending_messages.notifier(); let (result, response) = { let outer_state_lock = self.per_peer_state.read().unwrap(); @@ -622,7 +623,7 @@ where if let Some(response) = response { let msg = LSPS2Message::Response(request_id, response).into(); - self.pending_messages.enqueue(counterparty_node_id, msg); + message_queue_notifier.enqueue(counterparty_node_id, msg); } result @@ -637,6 +638,7 @@ where &self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId, opening_fee_params_menu: Vec, ) -> Result<(), APIError> { + let mut message_queue_notifier = self.pending_messages.notifier(); let (result, response) = { let outer_state_lock = self.per_peer_state.read().unwrap(); @@ -689,7 +691,7 @@ where if let Some(response) = response { let msg = LSPS2Message::Response(request_id, response).into(); - self.pending_messages.enqueue(counterparty_node_id, msg); + message_queue_notifier.enqueue(counterparty_node_id, msg); } result @@ -707,6 +709,8 @@ where &self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId, intercept_scid: u64, cltv_expiry_delta: u32, client_trusts_lsp: bool, user_channel_id: u128, ) -> Result<(), APIError> { + let mut message_queue_notifier = self.pending_messages.notifier(); + let (result, response) = { let outer_state_lock = self.per_peer_state.read().unwrap(); @@ -767,7 +771,7 @@ where if let Some(response) = response { let msg = LSPS2Message::Response(request_id, response).into(); - self.pending_messages.enqueue(counterparty_node_id, msg); + message_queue_notifier.enqueue(counterparty_node_id, msg); } result @@ -1202,11 +1206,16 @@ where &self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey, params: LSPS2GetInfoRequest, ) -> Result<(), LightningError> { + let mut message_queue_notifier = self.pending_messages.notifier(); let event_queue_notifier = self.pending_events.notifier(); let (result, response) = { let mut outer_state_lock = self.per_peer_state.write().unwrap(); - let inner_state_lock = - get_or_insert_peer_state_entry!(self, outer_state_lock, counterparty_node_id); + let inner_state_lock = get_or_insert_peer_state_entry!( + self, + outer_state_lock, + message_queue_notifier, + counterparty_node_id + ); let mut peer_state_lock = inner_state_lock.lock().unwrap(); let request = LSPS2Request::GetInfo(params.clone()); match self.insert_pending_request( @@ -1229,7 +1238,7 @@ where }; if let Some(msg) = response { - self.pending_messages.enqueue(counterparty_node_id, msg); + message_queue_notifier.enqueue(counterparty_node_id, msg); } result @@ -1238,6 +1247,7 @@ where fn handle_buy_request( &self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey, params: LSPS2BuyRequest, ) -> Result<(), LightningError> { + let mut message_queue_notifier = self.pending_messages.notifier(); let event_queue_notifier = self.pending_events.notifier(); if let Some(payment_size_msat) = params.payment_size_msat { if payment_size_msat < params.opening_fee_params.min_payment_size_msat { @@ -1247,7 +1257,7 @@ where data: None, }); let msg = LSPS2Message::Response(request_id, response).into(); - self.pending_messages.enqueue(counterparty_node_id, msg); + message_queue_notifier.enqueue(counterparty_node_id, msg); return Err(LightningError { err: "payment size is below our minimum supported payment size".to_string(), @@ -1262,7 +1272,7 @@ where data: None, }); let msg = LSPS2Message::Response(request_id, response).into(); - self.pending_messages.enqueue(counterparty_node_id, msg); + message_queue_notifier.enqueue(counterparty_node_id, msg); return Err(LightningError { err: "payment size is above our maximum supported payment size".to_string(), action: ErrorAction::IgnoreAndLog(Level::Info), @@ -1283,7 +1293,7 @@ where data: None, }); let msg = LSPS2Message::Response(request_id, response).into(); - self.pending_messages.enqueue(counterparty_node_id, msg); + message_queue_notifier.enqueue(counterparty_node_id, msg); return Err(LightningError { err: "payment size is too small to cover the opening fee".to_string(), action: ErrorAction::IgnoreAndLog(Level::Info), @@ -1297,7 +1307,7 @@ where data: None, }); let msg = LSPS2Message::Response(request_id, response).into(); - self.pending_messages.enqueue(counterparty_node_id, msg); + message_queue_notifier.enqueue(counterparty_node_id, msg); return Err(LightningError { err: "overflow error when calculating opening_fee".to_string(), action: ErrorAction::IgnoreAndLog(Level::Info), @@ -1314,7 +1324,7 @@ where data: None, }); let msg = LSPS2Message::Response(request_id, response).into(); - self.pending_messages.enqueue(counterparty_node_id, msg); + message_queue_notifier.enqueue(counterparty_node_id, msg); return Err(LightningError { err: "invalid opening fee parameters were supplied by client".to_string(), action: ErrorAction::IgnoreAndLog(Level::Info), @@ -1323,8 +1333,12 @@ where let (result, response) = { let mut outer_state_lock = self.per_peer_state.write().unwrap(); - let inner_state_lock = - get_or_insert_peer_state_entry!(self, outer_state_lock, counterparty_node_id); + let inner_state_lock = get_or_insert_peer_state_entry!( + self, + outer_state_lock, + message_queue_notifier, + counterparty_node_id + ); let mut peer_state_lock = inner_state_lock.lock().unwrap(); let request = LSPS2Request::Buy(params.clone()); @@ -1350,7 +1364,7 @@ where }; if let Some(msg) = response { - self.pending_messages.enqueue(counterparty_node_id, msg); + message_queue_notifier.enqueue(counterparty_node_id, msg); } result diff --git a/lightning-liquidity/src/lsps5/client.rs b/lightning-liquidity/src/lsps5/client.rs index 50b2c85ce1d..bcad453d7ad 100644 --- a/lightning-liquidity/src/lsps5/client.rs +++ b/lightning-liquidity/src/lsps5/client.rs @@ -205,6 +205,7 @@ where pub fn set_webhook( &self, counterparty_node_id: PublicKey, app_name: String, webhook_url: String, ) -> Result { + let mut message_queue_notifier = self.pending_messages.notifier(); let app_name = LSPS5AppName::from_string(app_name)?; let lsps_webhook_url = LSPS5WebhookUrl::from_string(webhook_url)?; @@ -228,7 +229,7 @@ where LSPS5Request::SetWebhook(SetWebhookRequest { app_name, webhook: lsps_webhook_url }); let message = LSPS5Message::Request(request_id.clone(), request); - self.pending_messages.enqueue(&counterparty_node_id, LSPSMessage::LSPS5(message)); + message_queue_notifier.enqueue(&counterparty_node_id, LSPSMessage::LSPS5(message)); Ok(request_id) } @@ -250,6 +251,7 @@ where /// [`WebhooksListed`]: super::event::LSPS5ClientEvent::WebhooksListed /// [`LSPS5Response::ListWebhooks`]: super::msgs::LSPS5Response::ListWebhooks pub fn list_webhooks(&self, counterparty_node_id: PublicKey) -> LSPSRequestId { + let mut message_queue_notifier = self.pending_messages.notifier(); let request_id = generate_request_id(&self.entropy_source); let now = LSPSDateTime::new_from_duration_since_epoch(self.time_provider.duration_since_epoch()); @@ -260,7 +262,7 @@ where let request = LSPS5Request::ListWebhooks(ListWebhooksRequest {}); let message = LSPS5Message::Request(request_id.clone(), request); - self.pending_messages.enqueue(&counterparty_node_id, LSPSMessage::LSPS5(message)); + message_queue_notifier.enqueue(&counterparty_node_id, LSPSMessage::LSPS5(message)); request_id } @@ -287,6 +289,7 @@ where pub fn remove_webhook( &self, counterparty_node_id: PublicKey, app_name: String, ) -> Result { + let mut message_queue_notifier = self.pending_messages.notifier(); let app_name = LSPS5AppName::from_string(app_name)?; let request_id = generate_request_id(&self.entropy_source); @@ -301,7 +304,7 @@ where let request = LSPS5Request::RemoveWebhook(RemoveWebhookRequest { app_name }); let message = LSPS5Message::Request(request_id.clone(), request); - self.pending_messages.enqueue(&counterparty_node_id, LSPSMessage::LSPS5(message)); + message_queue_notifier.enqueue(&counterparty_node_id, LSPSMessage::LSPS5(message)); Ok(request_id) } diff --git a/lightning-liquidity/src/lsps5/service.rs b/lightning-liquidity/src/lsps5/service.rs index 984dd5d0575..97fde364ba1 100644 --- a/lightning-liquidity/src/lsps5/service.rs +++ b/lightning-liquidity/src/lsps5/service.rs @@ -158,6 +158,8 @@ where &self, counterparty_node_id: PublicKey, request_id: LSPSRequestId, params: SetWebhookRequest, ) -> Result<(), LightningError> { + let mut message_queue_notifier = self.pending_messages.notifier(); + self.check_prune_stale_webhooks(); let mut webhooks = self.webhooks.lock().unwrap(); @@ -192,7 +194,7 @@ where LSPS5Response::SetWebhookError(error.clone().into()), ) .into(); - self.pending_messages.enqueue(&counterparty_node_id, msg); + message_queue_notifier.enqueue(&counterparty_node_id, msg); return Err(LightningError { err: error.message().into(), action: ErrorAction::IgnoreAndLog(Level::Info), @@ -221,7 +223,7 @@ where LSPS5Response::SetWebhookError(e.clone().into()), ) .into(); - self.pending_messages.enqueue(&counterparty_node_id, msg); + message_queue_notifier.enqueue(&counterparty_node_id, msg); LightningError { err: e.message().into(), action: ErrorAction::IgnoreAndLog(Level::Info), @@ -238,7 +240,7 @@ where }), ) .into(); - self.pending_messages.enqueue(&counterparty_node_id, msg); + message_queue_notifier.enqueue(&counterparty_node_id, msg); Ok(()) } @@ -246,6 +248,8 @@ where &self, counterparty_node_id: PublicKey, request_id: LSPSRequestId, _params: ListWebhooksRequest, ) -> Result<(), LightningError> { + let mut message_queue_notifier = self.pending_messages.notifier(); + self.check_prune_stale_webhooks(); let webhooks = self.webhooks.lock().unwrap(); @@ -259,7 +263,7 @@ where let response = ListWebhooksResponse { app_names, max_webhooks }; let msg = LSPS5Message::Response(request_id, LSPS5Response::ListWebhooks(response)).into(); - self.pending_messages.enqueue(&counterparty_node_id, msg); + message_queue_notifier.enqueue(&counterparty_node_id, msg); Ok(()) } @@ -268,6 +272,8 @@ where &self, counterparty_node_id: PublicKey, request_id: LSPSRequestId, params: RemoveWebhookRequest, ) -> Result<(), LightningError> { + let mut message_queue_notifier = self.pending_messages.notifier(); + self.check_prune_stale_webhooks(); let mut webhooks = self.webhooks.lock().unwrap(); @@ -278,7 +284,7 @@ where let msg = LSPS5Message::Response(request_id, LSPS5Response::RemoveWebhook(response)) .into(); - self.pending_messages.enqueue(&counterparty_node_id, msg); + message_queue_notifier.enqueue(&counterparty_node_id, msg); return Ok(()); } @@ -291,7 +297,7 @@ where ) .into(); - self.pending_messages.enqueue(&counterparty_node_id, msg); + message_queue_notifier.enqueue(&counterparty_node_id, msg); return Err(LightningError { err: error.message().into(), action: ErrorAction::IgnoreAndLog(Level::Info), diff --git a/lightning-liquidity/src/manager.rs b/lightning-liquidity/src/manager.rs index 6d412bd966a..bba3f6ea0d3 100644 --- a/lightning-liquidity/src/manager.rs +++ b/lightning-liquidity/src/manager.rs @@ -633,13 +633,15 @@ where LSPSMessage::from_str_with_id_map(&msg.payload, &mut request_id_to_method_map) } .map_err(|_| { + let mut message_queue_notifier = self.pending_messages.notifier(); + let error = LSPSResponseError { code: JSONRPC_INVALID_MESSAGE_ERROR_CODE, message: JSONRPC_INVALID_MESSAGE_ERROR_MESSAGE.to_string(), data: None, }; - self.pending_messages.enqueue(&sender_node_id, LSPSMessage::Invalid(error)); + message_queue_notifier.enqueue(&sender_node_id, LSPSMessage::Invalid(error)); self.ignored_peers.write().unwrap().insert(sender_node_id); let err = format!( "Failed to deserialize invalid LSPS message. Ignoring peer {} from now on.", diff --git a/lightning-liquidity/src/message_queue.rs b/lightning-liquidity/src/message_queue.rs index 58060862f07..45b3c7f48af 100644 --- a/lightning-liquidity/src/message_queue.rs +++ b/lightning-liquidity/src/message_queue.rs @@ -33,11 +33,29 @@ impl MessageQueue { self.pending_msgs_notifier.get_future() } - pub(crate) fn enqueue(&self, counterparty_node_id: &PublicKey, msg: LSPSMessage) { - { - let mut queue = self.queue.lock().unwrap(); - queue.push_back((*counterparty_node_id, msg)); + pub(crate) fn notifier(&self) -> MessageQueueNotifierGuard { + MessageQueueNotifierGuard { msg_queue: self, buffer: VecDeque::new() } + } +} + +// A guard type that will process buffered messages and wake the background processor when dropped. +#[must_use] +pub(crate) struct MessageQueueNotifierGuard<'a> { + msg_queue: &'a MessageQueue, + buffer: VecDeque<(PublicKey, LSPSMessage)>, +} + +impl<'a> MessageQueueNotifierGuard<'a> { + pub fn enqueue(&mut self, counterparty_node_id: &PublicKey, msg: LSPSMessage) { + self.buffer.push_back((*counterparty_node_id, msg)); + } +} + +impl<'a> Drop for MessageQueueNotifierGuard<'a> { + fn drop(&mut self) { + if !self.buffer.is_empty() { + self.msg_queue.queue.lock().unwrap().append(&mut self.buffer); + self.msg_queue.pending_msgs_notifier.notify(); } - self.pending_msgs_notifier.notify(); } } From 309591a170e0a9ad185256cab7d560a2a03f4062 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 1 Aug 2025 12:36:02 +0200 Subject: [PATCH 2/2] Simplify message handling logic Now that we have the `MessageQueueNotifierGuard`, we can be sure that we always dropped locks before notifying. Hence, we can save *a lot* of error-prone boilerplate that we used to ensure we'd only enqueue if we dropped locks. --- lightning-liquidity/src/lsps1/client.rs | 57 ++- lightning-liquidity/src/lsps1/service.rs | 168 ++++----- lightning-liquidity/src/lsps2/service.rs | 421 ++++++++++------------- 3 files changed, 266 insertions(+), 380 deletions(-) diff --git a/lightning-liquidity/src/lsps1/client.rs b/lightning-liquidity/src/lsps1/client.rs index fb9ea2ef8f3..45008baaa77 100644 --- a/lightning-liquidity/src/lsps1/client.rs +++ b/lightning-liquidity/src/lsps1/client.rs @@ -202,27 +202,19 @@ where ) -> LSPSRequestId { let mut message_queue_notifier = self.pending_messages.notifier(); - let (request_id, request_msg) = { - let mut outer_state_lock = self.per_peer_state.write().unwrap(); - let inner_state_lock = outer_state_lock - .entry(*counterparty_node_id) - .or_insert(Mutex::new(PeerState::default())); - let mut peer_state_lock = inner_state_lock.lock().unwrap(); - - let request_id = crate::utils::generate_request_id(&self.entropy_source); - let request = LSPS1Request::CreateOrder(LSPS1CreateOrderRequest { - order, - refund_onchain_address, - }); - let msg = LSPS1Message::Request(request_id.clone(), request).into(); - peer_state_lock.pending_create_order_requests.insert(request_id.clone()); + let mut outer_state_lock = self.per_peer_state.write().unwrap(); + let inner_state_lock = outer_state_lock + .entry(*counterparty_node_id) + .or_insert(Mutex::new(PeerState::default())); + let mut peer_state_lock = inner_state_lock.lock().unwrap(); - (request_id, Some(msg)) - }; + let request_id = crate::utils::generate_request_id(&self.entropy_source); + let request = + LSPS1Request::CreateOrder(LSPS1CreateOrderRequest { order, refund_onchain_address }); + let msg = LSPS1Message::Request(request_id.clone(), request).into(); + peer_state_lock.pending_create_order_requests.insert(request_id.clone()); - if let Some(msg) = request_msg { - message_queue_notifier.enqueue(&counterparty_node_id, msg); - } + message_queue_notifier.enqueue(&counterparty_node_id, msg); request_id } @@ -328,26 +320,19 @@ where ) -> LSPSRequestId { let mut message_queue_notifier = self.pending_messages.notifier(); - let (request_id, request_msg) = { - let mut outer_state_lock = self.per_peer_state.write().unwrap(); - let inner_state_lock = outer_state_lock - .entry(*counterparty_node_id) - .or_insert(Mutex::new(PeerState::default())); - let mut peer_state_lock = inner_state_lock.lock().unwrap(); - - let request_id = crate::utils::generate_request_id(&self.entropy_source); - peer_state_lock.pending_get_order_requests.insert(request_id.clone()); + let mut outer_state_lock = self.per_peer_state.write().unwrap(); + let inner_state_lock = outer_state_lock + .entry(*counterparty_node_id) + .or_insert(Mutex::new(PeerState::default())); + let mut peer_state_lock = inner_state_lock.lock().unwrap(); - let request = - LSPS1Request::GetOrder(LSPS1GetOrderRequest { order_id: order_id.clone() }); - let msg = LSPS1Message::Request(request_id.clone(), request).into(); + let request_id = crate::utils::generate_request_id(&self.entropy_source); + peer_state_lock.pending_get_order_requests.insert(request_id.clone()); - (request_id, Some(msg)) - }; + let request = LSPS1Request::GetOrder(LSPS1GetOrderRequest { order_id: order_id.clone() }); + let msg = LSPS1Message::Request(request_id.clone(), request).into(); - if let Some(msg) = request_msg { - message_queue_notifier.enqueue(&counterparty_node_id, msg); - } + message_queue_notifier.enqueue(&counterparty_node_id, msg); request_id } diff --git a/lightning-liquidity/src/lsps1/service.rs b/lightning-liquidity/src/lsps1/service.rs index a7d856eb64c..4dadf2e03dc 100644 --- a/lightning-liquidity/src/lsps1/service.rs +++ b/lightning-liquidity/src/lsps1/service.rs @@ -255,66 +255,46 @@ where payment: LSPS1PaymentInfo, created_at: LSPSDateTime, ) -> Result<(), APIError> { let mut message_queue_notifier = self.pending_messages.notifier(); - let (result, response) = { - let outer_state_lock = self.per_peer_state.read().unwrap(); - - match outer_state_lock.get(counterparty_node_id) { - Some(inner_state_lock) => { - let mut peer_state_lock = inner_state_lock.lock().unwrap(); - - match peer_state_lock.pending_requests.remove(&request_id) { - Some(LSPS1Request::CreateOrder(params)) => { - let order_id = self.generate_order_id(); - let channel = OutboundCRChannel::new( - params.order.clone(), - created_at.clone(), - order_id.clone(), - payment.clone(), - ); - - peer_state_lock.insert_outbound_channel(order_id.clone(), channel); - - let response = LSPS1Response::CreateOrder(LSPS1CreateOrderResponse { - order: params.order, - order_id, - order_state: LSPS1OrderState::Created, - created_at, - payment, - channel: None, - }); - - (Ok(()), Some(response)) - }, - - _ => ( - Err(APIError::APIMisuseError { - err: format!( - "No pending buy request for request_id: {:?}", - request_id - ), - }), - None, - ), - } - }, - None => ( - Err(APIError::APIMisuseError { - err: format!( - "No state for the counterparty exists: {:?}", - counterparty_node_id - ), - }), - None, - ), - } - }; - if let Some(response) = response { - let msg = LSPS1Message::Response(request_id, response).into(); - message_queue_notifier.enqueue(counterparty_node_id, msg); - } + let outer_state_lock = self.per_peer_state.read().unwrap(); + match outer_state_lock.get(counterparty_node_id) { + Some(inner_state_lock) => { + let mut peer_state_lock = inner_state_lock.lock().unwrap(); + + match peer_state_lock.pending_requests.remove(&request_id) { + Some(LSPS1Request::CreateOrder(params)) => { + let order_id = self.generate_order_id(); + let channel = OutboundCRChannel::new( + params.order.clone(), + created_at.clone(), + order_id.clone(), + payment.clone(), + ); - result + peer_state_lock.insert_outbound_channel(order_id.clone(), channel); + + let response = LSPS1Response::CreateOrder(LSPS1CreateOrderResponse { + order: params.order, + order_id, + order_state: LSPS1OrderState::Created, + created_at, + payment, + channel: None, + }); + let msg = LSPS1Message::Response(request_id, response).into(); + message_queue_notifier.enqueue(counterparty_node_id, msg); + Ok(()) + }, + + _ => Err(APIError::APIMisuseError { + err: format!("No pending buy request for request_id: {:?}", request_id), + }), + } + }, + None => Err(APIError::APIMisuseError { + err: format!("No state for the counterparty exists: {:?}", counterparty_node_id), + }), + } } fn handle_get_order_request( @@ -383,54 +363,38 @@ where ) -> Result<(), APIError> { let mut message_queue_notifier = self.pending_messages.notifier(); - let (result, response) = { - let outer_state_lock = self.per_peer_state.read().unwrap(); - - match outer_state_lock.get(&counterparty_node_id) { - Some(inner_state_lock) => { - let mut peer_state_lock = inner_state_lock.lock().unwrap(); + let outer_state_lock = self.per_peer_state.read().unwrap(); - if let Some(outbound_channel) = - peer_state_lock.outbound_channels_by_order_id.get_mut(&order_id) - { - let config = &outbound_channel.config; + match outer_state_lock.get(&counterparty_node_id) { + Some(inner_state_lock) => { + let mut peer_state_lock = inner_state_lock.lock().unwrap(); - let response = LSPS1Response::GetOrder(LSPS1CreateOrderResponse { - order_id, - order: config.order.clone(), - order_state, - created_at: config.created_at.clone(), - payment: config.payment.clone(), - channel, - }); - (Ok(()), Some(response)) - } else { - ( - Err(APIError::APIMisuseError { - err: format!("Channel with order_id {} not found", order_id.0), - }), - None, - ) - } - }, - None => ( + if let Some(outbound_channel) = + peer_state_lock.outbound_channels_by_order_id.get_mut(&order_id) + { + let config = &outbound_channel.config; + + let response = LSPS1Response::GetOrder(LSPS1CreateOrderResponse { + order_id, + order: config.order.clone(), + order_state, + created_at: config.created_at.clone(), + payment: config.payment.clone(), + channel, + }); + let msg = LSPS1Message::Response(request_id, response).into(); + message_queue_notifier.enqueue(&counterparty_node_id, msg); + Ok(()) + } else { Err(APIError::APIMisuseError { - err: format!( - "No existing state with counterparty {}", - counterparty_node_id - ), - }), - None, - ), - } - }; - - if let Some(response) = response { - let msg = LSPS1Message::Response(request_id, response).into(); - message_queue_notifier.enqueue(&counterparty_node_id, msg); + err: format!("Channel with order_id {} not found", order_id.0), + }) + } + }, + None => Err(APIError::APIMisuseError { + err: format!("No existing state with counterparty {}", counterparty_node_id), + }), } - - result } fn generate_order_id(&self) -> LSPS1OrderId { diff --git a/lightning-liquidity/src/lsps2/service.rs b/lightning-liquidity/src/lsps2/service.rs index 8053d36c25d..309d7ae1755 100644 --- a/lightning-liquidity/src/lsps2/service.rs +++ b/lightning-liquidity/src/lsps2/service.rs @@ -27,7 +27,7 @@ use crate::lsps2::payment_queue::{InterceptedHTLC, PaymentQueue}; use crate::lsps2::utils::{ compute_opening_fee, is_expired_opening_fee_params, is_valid_opening_fee_params, }; -use crate::message_queue::MessageQueue; +use crate::message_queue::{MessageQueue, MessageQueueNotifierGuard}; use crate::prelude::hash_map::Entry; use crate::prelude::{new_hash_map, HashMap}; use crate::sync::{Arc, Mutex, MutexGuard, RwLock}; @@ -511,7 +511,6 @@ macro_rules! get_or_insert_peer_state_entry { }; let msg = LSPSMessage::Invalid(error_response); - drop($outer_state_lock); $message_queue_notifier.enqueue($counterparty_node_id, msg); let err = format!( @@ -582,51 +581,36 @@ where &self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId, ) -> Result<(), APIError> { let mut message_queue_notifier = self.pending_messages.notifier(); - let (result, response) = { - let outer_state_lock = self.per_peer_state.read().unwrap(); - match outer_state_lock.get(counterparty_node_id) { - Some(inner_state_lock) => { - let mut peer_state_lock = inner_state_lock.lock().unwrap(); - - match self.remove_pending_request(&mut peer_state_lock, &request_id) { - Some(LSPS2Request::GetInfo(_)) => { - let response = LSPS2Response::GetInfoError(LSPSResponseError { - code: LSPS2_GET_INFO_REQUEST_UNRECOGNIZED_OR_STALE_TOKEN_ERROR_CODE, - message: "an unrecognized or stale token was provided".to_string(), - data: None, - }); - (Ok(()), Some(response)) - }, - _ => ( - Err(APIError::APIMisuseError { - err: format!( - "No pending get_info request for request_id: {:?}", - request_id - ), - }), - None, - ), - } - }, - None => ( - Err(APIError::APIMisuseError { + let outer_state_lock = self.per_peer_state.read().unwrap(); + + match outer_state_lock.get(counterparty_node_id) { + Some(inner_state_lock) => { + let mut peer_state_lock = inner_state_lock.lock().unwrap(); + + match self.remove_pending_request(&mut peer_state_lock, &request_id) { + Some(LSPS2Request::GetInfo(_)) => { + let response = LSPS2Response::GetInfoError(LSPSResponseError { + code: LSPS2_GET_INFO_REQUEST_UNRECOGNIZED_OR_STALE_TOKEN_ERROR_CODE, + message: "an unrecognized or stale token was provided".to_string(), + data: None, + }); + let msg = LSPS2Message::Response(request_id, response).into(); + message_queue_notifier.enqueue(counterparty_node_id, msg); + Ok(()) + }, + _ => Err(APIError::APIMisuseError { err: format!( - "No state for the counterparty exists: {:?}", - counterparty_node_id + "No pending get_info request for request_id: {:?}", + request_id ), }), - None, - ), - } - }; - - if let Some(response) = response { - let msg = LSPS2Message::Response(request_id, response).into(); - message_queue_notifier.enqueue(counterparty_node_id, msg); + } + }, + None => Err(APIError::APIMisuseError { + err: format!("No state for the counterparty exists: {:?}", counterparty_node_id), + }), } - - result } /// Used by LSP to provide fee parameters to a client requesting a JIT Channel. @@ -639,62 +623,47 @@ where opening_fee_params_menu: Vec, ) -> Result<(), APIError> { let mut message_queue_notifier = self.pending_messages.notifier(); - let (result, response) = { - let outer_state_lock = self.per_peer_state.read().unwrap(); - match outer_state_lock.get(counterparty_node_id) { - Some(inner_state_lock) => { - let mut peer_state_lock = inner_state_lock.lock().unwrap(); - - match self.remove_pending_request(&mut peer_state_lock, &request_id) { - Some(LSPS2Request::GetInfo(_)) => { - let mut opening_fee_params_menu: Vec = - opening_fee_params_menu - .into_iter() - .map(|param| { - param.into_opening_fee_params(&self.config.promise_secret) - }) - .collect(); - opening_fee_params_menu.sort_by(|a, b| { - match a.min_fee_msat.cmp(&b.min_fee_msat) { - CmpOrdering::Equal => a.proportional.cmp(&b.proportional), - other => other, - } - }); - let response = LSPS2Response::GetInfo(LSPS2GetInfoResponse { - opening_fee_params_menu, - }); - (Ok(()), Some(response)) - }, - _ => ( - Err(APIError::APIMisuseError { - err: format!( - "No pending get_info request for request_id: {:?}", - request_id - ), - }), - None, - ), - } - }, - None => ( - Err(APIError::APIMisuseError { + let outer_state_lock = self.per_peer_state.read().unwrap(); + + match outer_state_lock.get(counterparty_node_id) { + Some(inner_state_lock) => { + let mut peer_state_lock = inner_state_lock.lock().unwrap(); + + match self.remove_pending_request(&mut peer_state_lock, &request_id) { + Some(LSPS2Request::GetInfo(_)) => { + let mut opening_fee_params_menu: Vec = + opening_fee_params_menu + .into_iter() + .map(|param| { + param.into_opening_fee_params(&self.config.promise_secret) + }) + .collect(); + opening_fee_params_menu.sort_by(|a, b| { + match a.min_fee_msat.cmp(&b.min_fee_msat) { + CmpOrdering::Equal => a.proportional.cmp(&b.proportional), + other => other, + } + }); + let response = LSPS2Response::GetInfo(LSPS2GetInfoResponse { + opening_fee_params_menu, + }); + let msg = LSPS2Message::Response(request_id, response).into(); + message_queue_notifier.enqueue(counterparty_node_id, msg); + Ok(()) + }, + _ => Err(APIError::APIMisuseError { err: format!( - "No state for the counterparty exists: {:?}", - counterparty_node_id + "No pending get_info request for request_id: {:?}", + request_id ), }), - None, - ), - } - }; - - if let Some(response) = response { - let msg = LSPS2Message::Response(request_id, response).into(); - message_queue_notifier.enqueue(counterparty_node_id, msg); + } + }, + None => Err(APIError::APIMisuseError { + err: format!("No state for the counterparty exists: {:?}", counterparty_node_id), + }), } - - result } /// Used by LSP to provide the client with the intercept scid and @@ -711,70 +680,50 @@ where ) -> Result<(), APIError> { let mut message_queue_notifier = self.pending_messages.notifier(); - let (result, response) = { - let outer_state_lock = self.per_peer_state.read().unwrap(); + let outer_state_lock = self.per_peer_state.read().unwrap(); - match outer_state_lock.get(counterparty_node_id) { - Some(inner_state_lock) => { - let mut peer_state_lock = inner_state_lock.lock().unwrap(); + match outer_state_lock.get(counterparty_node_id) { + Some(inner_state_lock) => { + let mut peer_state_lock = inner_state_lock.lock().unwrap(); - match self.remove_pending_request(&mut peer_state_lock, &request_id) { - Some(LSPS2Request::Buy(buy_request)) => { - { - let mut peer_by_intercept_scid = - self.peer_by_intercept_scid.write().unwrap(); - peer_by_intercept_scid - .insert(intercept_scid, *counterparty_node_id); - } + match self.remove_pending_request(&mut peer_state_lock, &request_id) { + Some(LSPS2Request::Buy(buy_request)) => { + { + let mut peer_by_intercept_scid = + self.peer_by_intercept_scid.write().unwrap(); + peer_by_intercept_scid.insert(intercept_scid, *counterparty_node_id); + } - let outbound_jit_channel = OutboundJITChannel::new( - buy_request.payment_size_msat, - buy_request.opening_fee_params, - user_channel_id, - ); - - peer_state_lock - .intercept_scid_by_user_channel_id - .insert(user_channel_id, intercept_scid); - peer_state_lock - .insert_outbound_channel(intercept_scid, outbound_jit_channel); - - let response = LSPS2Response::Buy(LSPS2BuyResponse { - jit_channel_scid: intercept_scid.into(), - lsp_cltv_expiry_delta: cltv_expiry_delta, - client_trusts_lsp, - }); - (Ok(()), Some(response)) - }, - _ => ( - Err(APIError::APIMisuseError { - err: format!( - "No pending buy request for request_id: {:?}", - request_id - ), - }), - None, - ), - } - }, - None => ( - Err(APIError::APIMisuseError { - err: format!( - "No state for the counterparty exists: {:?}", - counterparty_node_id - ), + let outbound_jit_channel = OutboundJITChannel::new( + buy_request.payment_size_msat, + buy_request.opening_fee_params, + user_channel_id, + ); + + peer_state_lock + .intercept_scid_by_user_channel_id + .insert(user_channel_id, intercept_scid); + peer_state_lock + .insert_outbound_channel(intercept_scid, outbound_jit_channel); + + let response = LSPS2Response::Buy(LSPS2BuyResponse { + jit_channel_scid: intercept_scid.into(), + lsp_cltv_expiry_delta: cltv_expiry_delta, + client_trusts_lsp, + }); + let msg = LSPS2Message::Response(request_id, response).into(); + message_queue_notifier.enqueue(counterparty_node_id, msg); + Ok(()) + }, + _ => Err(APIError::APIMisuseError { + err: format!("No pending buy request for request_id: {:?}", request_id), }), - None, - ), - } - }; - - if let Some(response) = response { - let msg = LSPS2Message::Response(request_id, response).into(); - message_queue_notifier.enqueue(counterparty_node_id, msg); + } + }, + None => Err(APIError::APIMisuseError { + err: format!("No state for the counterparty exists: {:?}", counterparty_node_id), + }), } - - result } /// Forward [`Event::HTLCIntercepted`] event parameters into this function. @@ -1208,40 +1157,32 @@ where ) -> Result<(), LightningError> { let mut message_queue_notifier = self.pending_messages.notifier(); let event_queue_notifier = self.pending_events.notifier(); - let (result, response) = { - let mut outer_state_lock = self.per_peer_state.write().unwrap(); - let inner_state_lock = get_or_insert_peer_state_entry!( - self, - outer_state_lock, - message_queue_notifier, - counterparty_node_id - ); - let mut peer_state_lock = inner_state_lock.lock().unwrap(); - let request = LSPS2Request::GetInfo(params.clone()); - match self.insert_pending_request( - &mut peer_state_lock, - request_id.clone(), - *counterparty_node_id, - request, - ) { - (Ok(()), msg) => { - let event = LSPS2ServiceEvent::GetInfo { - request_id, - counterparty_node_id: *counterparty_node_id, - token: params.token, - }; - event_queue_notifier.enqueue(event); - (Ok(()), msg) - }, - (e, msg) => (e, msg), - } - }; - if let Some(msg) = response { - message_queue_notifier.enqueue(counterparty_node_id, msg); - } + let mut outer_state_lock = self.per_peer_state.write().unwrap(); + let inner_state_lock = get_or_insert_peer_state_entry!( + self, + outer_state_lock, + message_queue_notifier, + counterparty_node_id + ); + let mut peer_state_lock = inner_state_lock.lock().unwrap(); + let request = LSPS2Request::GetInfo(params.clone()); + self.insert_pending_request( + &mut peer_state_lock, + &mut message_queue_notifier, + request_id.clone(), + *counterparty_node_id, + request, + )?; + + let event = LSPS2ServiceEvent::GetInfo { + request_id, + counterparty_node_id: *counterparty_node_id, + token: params.token, + }; + event_queue_notifier.enqueue(event); - result + Ok(()) } fn handle_buy_request( @@ -1331,87 +1272,83 @@ where }); } - let (result, response) = { - let mut outer_state_lock = self.per_peer_state.write().unwrap(); - let inner_state_lock = get_or_insert_peer_state_entry!( - self, - outer_state_lock, - message_queue_notifier, - counterparty_node_id - ); - let mut peer_state_lock = inner_state_lock.lock().unwrap(); - - let request = LSPS2Request::Buy(params.clone()); - match self.insert_pending_request( - &mut peer_state_lock, - request_id.clone(), - *counterparty_node_id, - request, - ) { - (Ok(()), msg) => { - let event = LSPS2ServiceEvent::BuyRequest { - request_id, - counterparty_node_id: *counterparty_node_id, - opening_fee_params: params.opening_fee_params, - payment_size_msat: params.payment_size_msat, - }; - event_queue_notifier.enqueue(event); - - (Ok(()), msg) - }, - (e, msg) => (e, msg), - } + let mut outer_state_lock = self.per_peer_state.write().unwrap(); + let inner_state_lock = get_or_insert_peer_state_entry!( + self, + outer_state_lock, + message_queue_notifier, + counterparty_node_id + ); + let mut peer_state_lock = inner_state_lock.lock().unwrap(); + + let request = LSPS2Request::Buy(params.clone()); + + self.insert_pending_request( + &mut peer_state_lock, + &mut message_queue_notifier, + request_id.clone(), + *counterparty_node_id, + request, + )?; + + let event = LSPS2ServiceEvent::BuyRequest { + request_id, + counterparty_node_id: *counterparty_node_id, + opening_fee_params: params.opening_fee_params, + payment_size_msat: params.payment_size_msat, }; + event_queue_notifier.enqueue(event); - if let Some(msg) = response { - message_queue_notifier.enqueue(counterparty_node_id, msg); - } - - result + Ok(()) } fn insert_pending_request<'a>( - &self, peer_state_lock: &mut MutexGuard<'a, PeerState>, request_id: LSPSRequestId, + &self, peer_state_lock: &mut MutexGuard<'a, PeerState>, + message_queue_notifier: &mut MessageQueueNotifierGuard, request_id: LSPSRequestId, counterparty_node_id: PublicKey, request: LSPS2Request, - ) -> (Result<(), LightningError>, Option) { - let create_pending_request_limit_exceeded_response = |error_message: String| { - let error_details = LSPSResponseError { - code: LSPS0_CLIENT_REJECTED_ERROR_CODE, - message: "Reached maximum number of pending requests. Please try again later." - .to_string(), - data: None, - }; - let response = match &request { - LSPS2Request::GetInfo(_) => LSPS2Response::GetInfoError(error_details), - LSPS2Request::Buy(_) => LSPS2Response::BuyError(error_details), - }; - let msg = Some(LSPS2Message::Response(request_id.clone(), response).into()); + ) -> Result<(), LightningError> { + let create_pending_request_limit_exceeded_response = + |message_queue_notifier: &mut MessageQueueNotifierGuard, error_message: String| { + let error_details = LSPSResponseError { + code: LSPS0_CLIENT_REJECTED_ERROR_CODE, + message: "Reached maximum number of pending requests. Please try again later." + .to_string(), + data: None, + }; + let response = match &request { + LSPS2Request::GetInfo(_) => LSPS2Response::GetInfoError(error_details), + LSPS2Request::Buy(_) => LSPS2Response::BuyError(error_details), + }; + let msg = LSPS2Message::Response(request_id.clone(), response).into(); + message_queue_notifier.enqueue(&counterparty_node_id, msg); - let result = Err(LightningError { - err: error_message, - action: ErrorAction::IgnoreAndLog(Level::Debug), - }); - (result, msg) - }; + Err(LightningError { + err: error_message, + action: ErrorAction::IgnoreAndLog(Level::Debug), + }) + }; if self.total_pending_requests.load(Ordering::Relaxed) >= MAX_TOTAL_PENDING_REQUESTS { let error_message = format!( "Reached maximum number of total pending requests: {}", MAX_TOTAL_PENDING_REQUESTS ); - return create_pending_request_limit_exceeded_response(error_message); + return create_pending_request_limit_exceeded_response( + message_queue_notifier, + error_message, + ); } if peer_state_lock.pending_requests_and_channels() < MAX_PENDING_REQUESTS_PER_PEER { peer_state_lock.pending_requests.insert(request_id, request); self.total_pending_requests.fetch_add(1, Ordering::Relaxed); - (Ok(()), None) + Ok(()) } else { let error_message = format!( "Peer {} reached maximum number of pending requests: {}", counterparty_node_id, MAX_PENDING_REQUESTS_PER_PEER ); - create_pending_request_limit_exceeded_response(error_message) + create_pending_request_limit_exceeded_response(message_queue_notifier, error_message) } }