Skip to content

Commit 36a8b33

Browse files
joostjagerclaude
andcommitted
Add DeferredChainMonitor wrapper for batched monitor persistence
Introduce DeferredChainMonitor, a wrapper around ChainMonitor that queues watch_channel and update_channel operations, returning InProgress until flush() is called. This enables batched persistence of monitor updates after ChannelManager persistence, ensuring correct ordering. The wrapper implements all public traits that ChainMonitor supports (Listen, Confirm, EventsProvider, etc.) as pass-throughs, allowing it to be used as a drop-in replacement. Includes comprehensive tests covering the full channel lifecycle with payment flows using DeferredChainMonitor. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 1f5cef4 commit 36a8b33

File tree

4 files changed

+1147
-292
lines changed

4 files changed

+1147
-292
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ use fwd_batch::BatchDelay;
3232

3333
use lightning::chain;
3434
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
35-
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
35+
use lightning::chain::chainmonitor::Persist;
36+
use lightning::chain::deferred::DeferredChainMonitor;
3637
#[cfg(feature = "std")]
3738
use lightning::events::EventHandler;
3839
#[cfg(feature = "std")]
@@ -853,7 +854,8 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
853854
/// # fn send_data(&mut self, _data: &[u8], _continue_read: bool) -> usize { 0 }
854855
/// # fn disconnect_socket(&mut self) {}
855856
/// # }
856-
/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
857+
/// # type InnerChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
858+
/// # type ChainMonitor<B, F, FE> = lightning::chain::deferred::DeferredChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
857859
/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
858860
/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
859861
/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
@@ -964,7 +966,9 @@ pub async fn process_events_async<
964966
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
965967
EventHandler: Fn(Event) -> EventHandlerFuture,
966968
ES: Deref,
967-
M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
969+
M: Deref<
970+
Target = DeferredChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
971+
>,
968972
CM: Deref,
969973
OM: Deref,
970974
PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
@@ -1155,7 +1159,7 @@ where
11551159
// Capture the number of pending monitor writes before persisting the channel manager.
11561160
// We'll only flush this many writes after the manager is persisted, to avoid flushing
11571161
// monitor updates that arrived after the manager state was captured.
1158-
let pending_monitor_writes = chain_monitor.pending_monitor_operation_count();
1162+
let pending_monitor_writes = chain_monitor.pending_operation_count();
11591163

11601164
if channel_manager.get_cm().get_and_clear_needs_persistence() {
11611165
log_trace!(logger, "Persisting ChannelManager...");
@@ -1427,7 +1431,7 @@ where
14271431
.await?;
14281432

14291433
// Flush all pending monitor writes after final channel manager persistence.
1430-
let pending_monitor_writes = chain_monitor.pending_monitor_operation_count();
1434+
let pending_monitor_writes = chain_monitor.pending_operation_count();
14311435
if pending_monitor_writes > 0 {
14321436
log_trace!(logger, "Flushing {} monitor writes on shutdown", pending_monitor_writes);
14331437
chain_monitor.flush(pending_monitor_writes);
@@ -1485,7 +1489,9 @@ pub async fn process_events_async_with_kv_store_sync<
14851489
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
14861490
EventHandler: Fn(Event) -> EventHandlerFuture,
14871491
ES: Deref,
1488-
M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
1492+
M: Deref<
1493+
Target = DeferredChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
1494+
>,
14891495
CM: Deref,
14901496
OM: Deref,
14911497
PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
@@ -1600,7 +1606,15 @@ impl BackgroundProcessor {
16001606
ES: 'static + Deref + Send,
16011607
M: 'static
16021608
+ Deref<
1603-
Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
1609+
Target = DeferredChainMonitor<
1610+
<CM::Target as AChannelManager>::Signer,
1611+
CF,
1612+
T,
1613+
F,
1614+
L,
1615+
P,
1616+
ES,
1617+
>,
16041618
>
16051619
+ Send
16061620
+ Sync,
@@ -1744,7 +1758,7 @@ impl BackgroundProcessor {
17441758
}
17451759

17461760
// Capture the number of pending monitor writes before persisting the channel manager.
1747-
let pending_monitor_writes = chain_monitor.pending_monitor_operation_count();
1761+
let pending_monitor_writes = chain_monitor.pending_operation_count();
17481762

17491763
if channel_manager.get_cm().get_and_clear_needs_persistence() {
17501764
log_trace!(logger, "Persisting ChannelManager...");
@@ -1885,7 +1899,7 @@ impl BackgroundProcessor {
18851899
)?;
18861900

18871901
// Flush all pending monitor writes after final channel manager persistence.
1888-
let pending_monitor_writes = chain_monitor.pending_monitor_operation_count();
1902+
let pending_monitor_writes = chain_monitor.pending_operation_count();
18891903
if pending_monitor_writes > 0 {
18901904
log_trace!(
18911905
logger,
@@ -1978,7 +1992,7 @@ mod tests {
19781992
use core::sync::atomic::{AtomicBool, Ordering};
19791993
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
19801994
use lightning::chain::transaction::OutPoint;
1981-
use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
1995+
use lightning::chain::{chainmonitor, deferred, BestBlock, Confirm, Filter};
19821996
use lightning::events::{Event, PathFailure, ReplayEvent};
19831997
use lightning::ln::channelmanager;
19841998
use lightning::ln::channelmanager::{
@@ -2068,7 +2082,7 @@ mod tests {
20682082
Arc<test_utils::TestLogger>,
20692083
>;
20702084

2071-
type ChainMonitor = chainmonitor::ChainMonitor<
2085+
type ChainMonitor = deferred::DeferredChainMonitor<
20722086
InMemorySigner,
20732087
Arc<test_utils::TestChainSource>,
20742088
Arc<test_utils::TestBroadcaster>,
@@ -2496,15 +2510,16 @@ mod tests {
24962510
let now = Duration::from_secs(genesis_block.header.time as u64);
24972511
let keys_manager =
24982512
Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos(), true));
2499-
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(
2513+
let inner_chain_monitor = chainmonitor::ChainMonitor::new(
25002514
Some(Arc::clone(&chain_source)),
25012515
Arc::clone(&tx_broadcaster),
25022516
Arc::clone(&logger),
25032517
Arc::clone(&fee_estimator),
25042518
Arc::clone(&kv_store),
25052519
Arc::clone(&keys_manager),
25062520
keys_manager.get_peer_storage_key(),
2507-
));
2521+
);
2522+
let chain_monitor = Arc::new(deferred::DeferredChainMonitor::new(inner_chain_monitor));
25082523
let best_block = BestBlock::from_network(network);
25092524
let params = ChainParameters { network, best_block };
25102525
let manager = Arc::new(ChannelManager::new(
@@ -2640,19 +2655,25 @@ mod tests {
26402655
tx.clone(),
26412656
)
26422657
.unwrap();
2658+
// Flush deferred monitor operations so messages aren't held back
2659+
$node_a.chain_monitor.flush_all();
26432660
let msg_a = get_event_msg!(
26442661
$node_a,
26452662
MessageSendEvent::SendFundingCreated,
26462663
$node_b.node.get_our_node_id()
26472664
);
26482665
$node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a);
2666+
// Flush node_b's monitor so it releases the FundingSigned message
2667+
$node_b.chain_monitor.flush_all();
26492668
get_event!($node_b, Event::ChannelPending);
26502669
let msg_b = get_event_msg!(
26512670
$node_b,
26522671
MessageSendEvent::SendFundingSigned,
26532672
$node_a.node.get_our_node_id()
26542673
);
26552674
$node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b);
2675+
// Flush node_a's monitor for the final update
2676+
$node_a.chain_monitor.flush_all();
26562677
get_event!($node_a, Event::ChannelPending);
26572678
tx
26582679
}};
@@ -3099,11 +3120,17 @@ mod tests {
30993120
.node
31003121
.funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone())
31013122
.unwrap();
3123+
// Flush node_0's deferred monitor operations so the FundingCreated message is released
3124+
nodes[0].chain_monitor.flush_all();
31023125
let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
31033126
nodes[1].node.handle_funding_created(node_0_id, &msg_0);
3127+
// Flush node_1's deferred monitor operations so events and FundingSigned are released
3128+
nodes[1].chain_monitor.flush_all();
31043129
get_event!(nodes[1], Event::ChannelPending);
31053130
let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
31063131
nodes[0].node.handle_funding_signed(node_1_id, &msg_1);
3132+
// Flush node_0's monitor for the funding_signed update
3133+
nodes[0].chain_monitor.flush_all();
31073134
channel_pending_recv
31083135
.recv_timeout(EVENT_DEADLINE)
31093136
.expect("ChannelPending not handled within deadline");
@@ -3164,6 +3191,8 @@ mod tests {
31643191
error_message.to_string(),
31653192
)
31663193
.unwrap();
3194+
// Flush the monitor update triggered by force close so the commitment tx is broadcasted
3195+
nodes[0].chain_monitor.flush_all();
31673196
let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
31683197
confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
31693198

0 commit comments

Comments
 (0)