Skip to content

Commit 107de27

Browse files
committed
Introduce MessageQueueNotifierGuard type
Previously, when enqueuing new messages to the `MessageQueue`, we'd directly notify the BP to handle the messages, potentially causing multiple wake-ups in short succession, and risking that we'd reenter with crucial locks still held. Here, we instead introduce a `MessageQueueNotifierGuard` type that parallels our recently-introduced `EventQueueNotifierGuard`, buffers the messages, and will only append them to the message queue and notify the BP when dropped. This will allow us to remove a lot of error-prone boilerplate in the next step.
1 parent 61e5819 commit 107de27

File tree

10 files changed

+106
-42
lines changed

10 files changed

+106
-42
lines changed

lightning-liquidity/src/lsps0/client.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,14 @@ where
5050
/// specifcation](https://github.com/lightning/blips/blob/master/blip-0050.md#lsps-specification-support-query)
5151
/// for more information.
5252
pub fn list_protocols(&self, counterparty_node_id: &PublicKey) {
53+
let mut message_queue_notifier = self.pending_messages.notifier();
54+
5355
let msg = LSPS0Message::Request(
5456
utils::generate_request_id(&self.entropy_source),
5557
LSPS0Request::ListProtocols(LSPS0ListProtocolsRequest {}),
5658
);
5759

58-
self.pending_messages.enqueue(counterparty_node_id, msg.into());
60+
message_queue_notifier.enqueue(counterparty_node_id, msg.into());
5961
}
6062

6163
fn handle_response(

lightning-liquidity/src/lsps0/service.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ impl LSPS0ServiceHandler {
4040
fn handle_request(
4141
&self, request_id: LSPSRequestId, request: LSPS0Request, counterparty_node_id: &PublicKey,
4242
) -> Result<(), lightning::ln::msgs::LightningError> {
43+
let mut message_queue_notifier = self.pending_messages.notifier();
44+
4345
match request {
4446
LSPS0Request::ListProtocols(_) => {
4547
let msg = LSPS0Message::Response(
@@ -48,7 +50,7 @@ impl LSPS0ServiceHandler {
4850
protocols: self.protocols.clone(),
4951
}),
5052
);
51-
self.pending_messages.enqueue(counterparty_node_id, msg.into());
53+
message_queue_notifier.enqueue(counterparty_node_id, msg.into());
5254
Ok(())
5355
},
5456
}

lightning-liquidity/src/lsps1/client.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ where
9090
///
9191
/// [`SupportedOptionsReady`]: crate::lsps1::event::LSPS1ClientEvent::SupportedOptionsReady
9292
pub fn request_supported_options(&self, counterparty_node_id: PublicKey) -> LSPSRequestId {
93+
let mut message_queue_notifier = self.pending_messages.notifier();
94+
9395
let request_id = crate::utils::generate_request_id(&self.entropy_source);
9496
{
9597
let mut outer_state_lock = self.per_peer_state.write().unwrap();
@@ -102,7 +104,7 @@ where
102104

103105
let request = LSPS1Request::GetInfo(LSPS1GetInfoRequest {});
104106
let msg = LSPS1Message::Request(request_id.clone(), request).into();
105-
self.pending_messages.enqueue(&counterparty_node_id, msg);
107+
message_queue_notifier.enqueue(&counterparty_node_id, msg);
106108
request_id
107109
}
108110

@@ -198,6 +200,8 @@ where
198200
&self, counterparty_node_id: &PublicKey, order: LSPS1OrderParams,
199201
refund_onchain_address: Option<Address>,
200202
) -> LSPSRequestId {
203+
let mut message_queue_notifier = self.pending_messages.notifier();
204+
201205
let (request_id, request_msg) = {
202206
let mut outer_state_lock = self.per_peer_state.write().unwrap();
203207
let inner_state_lock = outer_state_lock
@@ -217,7 +221,7 @@ where
217221
};
218222

219223
if let Some(msg) = request_msg {
220-
self.pending_messages.enqueue(&counterparty_node_id, msg);
224+
message_queue_notifier.enqueue(&counterparty_node_id, msg);
221225
}
222226

223227
request_id
@@ -322,6 +326,8 @@ where
322326
pub fn check_order_status(
323327
&self, counterparty_node_id: &PublicKey, order_id: LSPS1OrderId,
324328
) -> LSPSRequestId {
329+
let mut message_queue_notifier = self.pending_messages.notifier();
330+
325331
let (request_id, request_msg) = {
326332
let mut outer_state_lock = self.per_peer_state.write().unwrap();
327333
let inner_state_lock = outer_state_lock
@@ -340,7 +346,7 @@ where
340346
};
341347

342348
if let Some(msg) = request_msg {
343-
self.pending_messages.enqueue(&counterparty_node_id, msg);
349+
message_queue_notifier.enqueue(&counterparty_node_id, msg);
344350
}
345351

346352
request_id

lightning-liquidity/src/lsps1/service.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,8 @@ where
177177
fn handle_get_info_request(
178178
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
179179
) -> Result<(), LightningError> {
180+
let mut message_queue_notifier = self.pending_messages.notifier();
181+
180182
let response = LSPS1Response::GetInfo(LSPS1GetInfoResponse {
181183
options: self
182184
.config
@@ -190,15 +192,17 @@ where
190192
});
191193

192194
let msg = LSPS1Message::Response(request_id, response).into();
193-
self.pending_messages.enqueue(counterparty_node_id, msg);
195+
message_queue_notifier.enqueue(counterparty_node_id, msg);
194196
Ok(())
195197
}
196198

197199
fn handle_create_order_request(
198200
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
199201
params: LSPS1CreateOrderRequest,
200202
) -> Result<(), LightningError> {
203+
let mut message_queue_notifier = self.pending_messages.notifier();
201204
let event_queue_notifier = self.pending_events.notifier();
205+
202206
if !is_valid(&params.order, &self.config.supported_options.as_ref().unwrap()) {
203207
let response = LSPS1Response::CreateOrderError(LSPSResponseError {
204208
code: LSPS1_CREATE_ORDER_REQUEST_ORDER_MISMATCH_ERROR_CODE,
@@ -209,7 +213,7 @@ where
209213
)),
210214
});
211215
let msg = LSPS1Message::Response(request_id, response).into();
212-
self.pending_messages.enqueue(counterparty_node_id, msg);
216+
message_queue_notifier.enqueue(counterparty_node_id, msg);
213217
return Err(LightningError {
214218
err: format!(
215219
"Client order does not match any supported options: {:?}",
@@ -250,6 +254,7 @@ where
250254
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
251255
payment: LSPS1PaymentInfo, created_at: LSPSDateTime,
252256
) -> Result<(), APIError> {
257+
let mut message_queue_notifier = self.pending_messages.notifier();
253258
let (result, response) = {
254259
let outer_state_lock = self.per_peer_state.read().unwrap();
255260

@@ -306,7 +311,7 @@ where
306311

307312
if let Some(response) = response {
308313
let msg = LSPS1Message::Response(request_id, response).into();
309-
self.pending_messages.enqueue(counterparty_node_id, msg);
314+
message_queue_notifier.enqueue(counterparty_node_id, msg);
310315
}
311316

312317
result
@@ -376,6 +381,8 @@ where
376381
&self, request_id: LSPSRequestId, counterparty_node_id: PublicKey, order_id: LSPS1OrderId,
377382
order_state: LSPS1OrderState, channel: Option<LSPS1ChannelInfo>,
378383
) -> Result<(), APIError> {
384+
let mut message_queue_notifier = self.pending_messages.notifier();
385+
379386
let (result, response) = {
380387
let outer_state_lock = self.per_peer_state.read().unwrap();
381388

@@ -420,7 +427,7 @@ where
420427

421428
if let Some(response) = response {
422429
let msg = LSPS1Message::Response(request_id, response).into();
423-
self.pending_messages.enqueue(&counterparty_node_id, msg);
430+
message_queue_notifier.enqueue(&counterparty_node_id, msg);
424431
}
425432

426433
result

lightning-liquidity/src/lsps2/client.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ where
118118
pub fn request_opening_params(
119119
&self, counterparty_node_id: PublicKey, token: Option<String>,
120120
) -> LSPSRequestId {
121+
let mut message_queue_notifier = self.pending_messages.notifier();
122+
121123
let request_id = crate::utils::generate_request_id(&self.entropy_source);
122124

123125
{
@@ -131,7 +133,7 @@ where
131133

132134
let request = LSPS2Request::GetInfo(LSPS2GetInfoRequest { token });
133135
let msg = LSPS2Message::Request(request_id.clone(), request).into();
134-
self.pending_messages.enqueue(&counterparty_node_id, msg);
136+
message_queue_notifier.enqueue(&counterparty_node_id, msg);
135137

136138
request_id
137139
}
@@ -160,6 +162,8 @@ where
160162
&self, counterparty_node_id: PublicKey, payment_size_msat: Option<u64>,
161163
opening_fee_params: LSPS2OpeningFeeParams,
162164
) -> Result<LSPSRequestId, APIError> {
165+
let mut message_queue_notifier = self.pending_messages.notifier();
166+
163167
let request_id = crate::utils::generate_request_id(&self.entropy_source);
164168

165169
{
@@ -184,7 +188,7 @@ where
184188

185189
let request = LSPS2Request::Buy(LSPS2BuyRequest { opening_fee_params, payment_size_msat });
186190
let msg = LSPS2Message::Request(request_id.clone(), request).into();
187-
self.pending_messages.enqueue(&counterparty_node_id, msg);
191+
message_queue_notifier.enqueue(&counterparty_node_id, msg);
188192

189193
Ok(request_id)
190194
}

lightning-liquidity/src/lsps2/service.rs

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,7 @@ impl PeerState {
499499
}
500500

501501
macro_rules! get_or_insert_peer_state_entry {
502-
($self: ident, $outer_state_lock: expr, $counterparty_node_id: expr) => {{
502+
($self: ident, $outer_state_lock: expr, $message_queue_notifier: expr, $counterparty_node_id: expr) => {{
503503
// Return an internal error and abort if we hit the maximum allowed number of total peers.
504504
let is_limited_by_max_total_peers = $outer_state_lock.len() >= MAX_TOTAL_PEERS;
505505
match $outer_state_lock.entry(*$counterparty_node_id) {
@@ -512,7 +512,7 @@ macro_rules! get_or_insert_peer_state_entry {
512512

513513
let msg = LSPSMessage::Invalid(error_response);
514514
drop($outer_state_lock);
515-
$self.pending_messages.enqueue($counterparty_node_id, msg);
515+
$message_queue_notifier.enqueue($counterparty_node_id, msg);
516516

517517
let err = format!(
518518
"Dropping request from peer {} due to reaching maximally allowed number of total peers: {}",
@@ -581,6 +581,7 @@ where
581581
pub fn invalid_token_provided(
582582
&self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId,
583583
) -> Result<(), APIError> {
584+
let mut message_queue_notifier = self.pending_messages.notifier();
584585
let (result, response) = {
585586
let outer_state_lock = self.per_peer_state.read().unwrap();
586587

@@ -622,7 +623,7 @@ where
622623

623624
if let Some(response) = response {
624625
let msg = LSPS2Message::Response(request_id, response).into();
625-
self.pending_messages.enqueue(counterparty_node_id, msg);
626+
message_queue_notifier.enqueue(counterparty_node_id, msg);
626627
}
627628

628629
result
@@ -637,6 +638,7 @@ where
637638
&self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId,
638639
opening_fee_params_menu: Vec<LSPS2RawOpeningFeeParams>,
639640
) -> Result<(), APIError> {
641+
let mut message_queue_notifier = self.pending_messages.notifier();
640642
let (result, response) = {
641643
let outer_state_lock = self.per_peer_state.read().unwrap();
642644

@@ -689,7 +691,7 @@ where
689691

690692
if let Some(response) = response {
691693
let msg = LSPS2Message::Response(request_id, response).into();
692-
self.pending_messages.enqueue(counterparty_node_id, msg);
694+
message_queue_notifier.enqueue(counterparty_node_id, msg);
693695
}
694696

695697
result
@@ -707,6 +709,8 @@ where
707709
&self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId, intercept_scid: u64,
708710
cltv_expiry_delta: u32, client_trusts_lsp: bool, user_channel_id: u128,
709711
) -> Result<(), APIError> {
712+
let mut message_queue_notifier = self.pending_messages.notifier();
713+
710714
let (result, response) = {
711715
let outer_state_lock = self.per_peer_state.read().unwrap();
712716

@@ -767,7 +771,7 @@ where
767771

768772
if let Some(response) = response {
769773
let msg = LSPS2Message::Response(request_id, response).into();
770-
self.pending_messages.enqueue(counterparty_node_id, msg);
774+
message_queue_notifier.enqueue(counterparty_node_id, msg);
771775
}
772776

773777
result
@@ -1202,11 +1206,16 @@ where
12021206
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
12031207
params: LSPS2GetInfoRequest,
12041208
) -> Result<(), LightningError> {
1209+
let mut message_queue_notifier = self.pending_messages.notifier();
12051210
let event_queue_notifier = self.pending_events.notifier();
12061211
let (result, response) = {
12071212
let mut outer_state_lock = self.per_peer_state.write().unwrap();
1208-
let inner_state_lock =
1209-
get_or_insert_peer_state_entry!(self, outer_state_lock, counterparty_node_id);
1213+
let inner_state_lock = get_or_insert_peer_state_entry!(
1214+
self,
1215+
outer_state_lock,
1216+
message_queue_notifier,
1217+
counterparty_node_id
1218+
);
12101219
let mut peer_state_lock = inner_state_lock.lock().unwrap();
12111220
let request = LSPS2Request::GetInfo(params.clone());
12121221
match self.insert_pending_request(
@@ -1229,7 +1238,7 @@ where
12291238
};
12301239

12311240
if let Some(msg) = response {
1232-
self.pending_messages.enqueue(counterparty_node_id, msg);
1241+
message_queue_notifier.enqueue(counterparty_node_id, msg);
12331242
}
12341243

12351244
result
@@ -1238,6 +1247,7 @@ where
12381247
fn handle_buy_request(
12391248
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey, params: LSPS2BuyRequest,
12401249
) -> Result<(), LightningError> {
1250+
let mut message_queue_notifier = self.pending_messages.notifier();
12411251
let event_queue_notifier = self.pending_events.notifier();
12421252
if let Some(payment_size_msat) = params.payment_size_msat {
12431253
if payment_size_msat < params.opening_fee_params.min_payment_size_msat {
@@ -1247,7 +1257,7 @@ where
12471257
data: None,
12481258
});
12491259
let msg = LSPS2Message::Response(request_id, response).into();
1250-
self.pending_messages.enqueue(counterparty_node_id, msg);
1260+
message_queue_notifier.enqueue(counterparty_node_id, msg);
12511261

12521262
return Err(LightningError {
12531263
err: "payment size is below our minimum supported payment size".to_string(),
@@ -1262,7 +1272,7 @@ where
12621272
data: None,
12631273
});
12641274
let msg = LSPS2Message::Response(request_id, response).into();
1265-
self.pending_messages.enqueue(counterparty_node_id, msg);
1275+
message_queue_notifier.enqueue(counterparty_node_id, msg);
12661276
return Err(LightningError {
12671277
err: "payment size is above our maximum supported payment size".to_string(),
12681278
action: ErrorAction::IgnoreAndLog(Level::Info),
@@ -1283,7 +1293,7 @@ where
12831293
data: None,
12841294
});
12851295
let msg = LSPS2Message::Response(request_id, response).into();
1286-
self.pending_messages.enqueue(counterparty_node_id, msg);
1296+
message_queue_notifier.enqueue(counterparty_node_id, msg);
12871297
return Err(LightningError {
12881298
err: "payment size is too small to cover the opening fee".to_string(),
12891299
action: ErrorAction::IgnoreAndLog(Level::Info),
@@ -1297,7 +1307,7 @@ where
12971307
data: None,
12981308
});
12991309
let msg = LSPS2Message::Response(request_id, response).into();
1300-
self.pending_messages.enqueue(counterparty_node_id, msg);
1310+
message_queue_notifier.enqueue(counterparty_node_id, msg);
13011311
return Err(LightningError {
13021312
err: "overflow error when calculating opening_fee".to_string(),
13031313
action: ErrorAction::IgnoreAndLog(Level::Info),
@@ -1314,7 +1324,7 @@ where
13141324
data: None,
13151325
});
13161326
let msg = LSPS2Message::Response(request_id, response).into();
1317-
self.pending_messages.enqueue(counterparty_node_id, msg);
1327+
message_queue_notifier.enqueue(counterparty_node_id, msg);
13181328
return Err(LightningError {
13191329
err: "invalid opening fee parameters were supplied by client".to_string(),
13201330
action: ErrorAction::IgnoreAndLog(Level::Info),
@@ -1323,8 +1333,12 @@ where
13231333

13241334
let (result, response) = {
13251335
let mut outer_state_lock = self.per_peer_state.write().unwrap();
1326-
let inner_state_lock =
1327-
get_or_insert_peer_state_entry!(self, outer_state_lock, counterparty_node_id);
1336+
let inner_state_lock = get_or_insert_peer_state_entry!(
1337+
self,
1338+
outer_state_lock,
1339+
message_queue_notifier,
1340+
counterparty_node_id
1341+
);
13281342
let mut peer_state_lock = inner_state_lock.lock().unwrap();
13291343

13301344
let request = LSPS2Request::Buy(params.clone());
@@ -1350,7 +1364,7 @@ where
13501364
};
13511365

13521366
if let Some(msg) = response {
1353-
self.pending_messages.enqueue(counterparty_node_id, msg);
1367+
message_queue_notifier.enqueue(counterparty_node_id, msg);
13541368
}
13551369

13561370
result

0 commit comments

Comments
 (0)