Skip to content

Commit 7f2d625

Browse files
committed
Introduce Runtime object allowng to detect outer runtime context
Instead of holding an `Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>` and dealing with stuff like `tokio::task::block_in_place` at all callsites, we introduce a `Runtime` object that takes care of the state transitions, and allows to detect and reuse an outer runtime context. We also adjust the `with_runtime` API to take a `tokio::runtime::Handle` rather than an `Arc<Runtime>`.
1 parent 9151340 commit 7f2d625

File tree

15 files changed

+413
-275
lines changed

15 files changed

+413
-275
lines changed

bindings/ldk_node.udl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ dictionary LogRecord {
6464

6565
[Trait, WithForeign]
6666
interface LogWriter {
67-
void log(LogRecord record);
67+
void log(LogRecord record);
6868
};
6969

7070
interface Builder {
@@ -160,8 +160,8 @@ interface Node {
160160

161161
[Enum]
162162
interface Bolt11InvoiceDescription {
163-
Hash(string hash);
164-
Direct(string description);
163+
Hash(string hash);
164+
Direct(string description);
165165
};
166166

167167
interface Bolt11Payment {
@@ -252,6 +252,7 @@ interface LSPS1Liquidity {
252252
enum NodeError {
253253
"AlreadyRunning",
254254
"NotRunning",
255+
"RuntimeSetupFailed",
255256
"OnchainTxCreationFailed",
256257
"ConnectionFailed",
257258
"InvoiceCreationFailed",

src/builder.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::liquidity::{
2727
use crate::logger::{log_error, log_info, LdkLogger, LogLevel, LogWriter, Logger};
2828
use crate::message_handler::NodeCustomMessageHandler;
2929
use crate::peer_store::PeerStore;
30+
use crate::runtime::Runtime;
3031
use crate::tx_broadcaster::TransactionBroadcaster;
3132
use crate::types::{
3233
ChainMonitor, ChannelManager, DynStore, GossipSync, Graph, KeysManager, MessageRouter,
@@ -1101,7 +1102,7 @@ fn build_with_store_internal(
11011102
},
11021103
};
11031104

1104-
let runtime = Arc::new(RwLock::new(None));
1105+
let runtime = Arc::new(Runtime::new(Arc::clone(&logger)));
11051106

11061107
// Initialize the ChainMonitor
11071108
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(

src/chain/electrum.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use crate::fee_estimator::{
1515
ConfirmationTarget,
1616
};
1717
use crate::logger::{log_bytes, log_error, log_info, log_trace, LdkLogger, Logger};
18+
use crate::runtime::Runtime;
1819

1920
use lightning::chain::{Confirm, Filter, WatchedOutput};
2021
use lightning::util::ser::Writeable;
@@ -46,15 +47,14 @@ pub(crate) struct ElectrumRuntimeClient {
4647
electrum_client: Arc<ElectrumClient>,
4748
bdk_electrum_client: Arc<BdkElectrumClient<ElectrumClient>>,
4849
tx_sync: Arc<ElectrumSyncClient<Arc<Logger>>>,
49-
runtime: Arc<tokio::runtime::Runtime>,
50+
runtime: Arc<Runtime>,
5051
config: Arc<Config>,
5152
logger: Arc<Logger>,
5253
}
5354

5455
impl ElectrumRuntimeClient {
5556
pub(crate) fn new(
56-
server_url: String, runtime: Arc<tokio::runtime::Runtime>, config: Arc<Config>,
57-
logger: Arc<Logger>,
57+
server_url: String, runtime: Arc<Runtime>, config: Arc<Config>, logger: Arc<Logger>,
5858
) -> Result<Self, Error> {
5959
let electrum_config = ElectrumConfigBuilder::new()
6060
.retry(ELECTRUM_CLIENT_NUM_RETRIES)
@@ -88,7 +88,7 @@ impl ElectrumRuntimeClient {
8888
let now = Instant::now();
8989

9090
let tx_sync = Arc::clone(&self.tx_sync);
91-
let spawn_fut = self.runtime.spawn_blocking(move || tx_sync.sync(confirmables));
91+
let spawn_fut = self.runtime.spawn_blocking(move || tx_sync.sync(confirmables))?;
9292
let timeout_fut =
9393
tokio::time::timeout(Duration::from_secs(LDK_WALLET_SYNC_TIMEOUT_SECS), spawn_fut);
9494

@@ -130,7 +130,7 @@ impl ElectrumRuntimeClient {
130130
BDK_ELECTRUM_CLIENT_BATCH_SIZE,
131131
true,
132132
)
133-
});
133+
})?;
134134
let wallet_sync_timeout_fut =
135135
tokio::time::timeout(Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), spawn_fut);
136136

@@ -159,7 +159,7 @@ impl ElectrumRuntimeClient {
159159

160160
let spawn_fut = self.runtime.spawn_blocking(move || {
161161
bdk_electrum_client.sync(request, BDK_ELECTRUM_CLIENT_BATCH_SIZE, true)
162-
});
162+
})?;
163163
let wallet_sync_timeout_fut =
164164
tokio::time::timeout(Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), spawn_fut);
165165

@@ -185,8 +185,13 @@ impl ElectrumRuntimeClient {
185185
let txid = tx.compute_txid();
186186
let tx_bytes = tx.encode();
187187

188-
let spawn_fut =
189-
self.runtime.spawn_blocking(move || electrum_client.transaction_broadcast(&tx));
188+
let spawn_fut = if let Ok(spawn_fut) =
189+
self.runtime.spawn_blocking(move || electrum_client.transaction_broadcast(&tx))
190+
{
191+
spawn_fut
192+
} else {
193+
return;
194+
};
190195

191196
let timeout_fut =
192197
tokio::time::timeout(Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS), spawn_fut);
@@ -233,7 +238,7 @@ impl ElectrumRuntimeClient {
233238
batch.estimate_fee(num_blocks);
234239
}
235240

236-
let spawn_fut = self.runtime.spawn_blocking(move || electrum_client.batch_call(&batch));
241+
let spawn_fut = self.runtime.spawn_blocking(move || electrum_client.batch_call(&batch))?;
237242

238243
let timeout_fut = tokio::time::timeout(
239244
Duration::from_secs(FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS),

src/chain/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::fee_estimator::{
2424
};
2525
use crate::io::utils::write_node_metrics;
2626
use crate::logger::{log_bytes, log_error, log_info, log_trace, LdkLogger, Logger};
27+
use crate::runtime::Runtime;
2728
use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
2829
use crate::{Error, NodeMetrics};
2930

@@ -126,7 +127,7 @@ impl ElectrumRuntimeStatus {
126127
}
127128

128129
pub(crate) fn start(
129-
&mut self, server_url: String, runtime: Arc<tokio::runtime::Runtime>, config: Arc<Config>,
130+
&mut self, server_url: String, runtime: Arc<Runtime>, config: Arc<Config>,
130131
logger: Arc<Logger>,
131132
) -> Result<(), Error> {
132133
match self {
@@ -311,7 +312,7 @@ impl ChainSource {
311312
}
312313
}
313314

314-
pub(crate) fn start(&self, runtime: Arc<tokio::runtime::Runtime>) -> Result<(), Error> {
315+
pub(crate) fn start(&self, runtime: Arc<Runtime>) -> Result<(), Error> {
315316
match self {
316317
Self::Electrum { server_url, electrum_runtime_status, config, logger, .. } => {
317318
electrum_runtime_status.write().unwrap().start(

src/error.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8+
use crate::runtime::RuntimeError;
9+
810
use bdk_chain::bitcoin::psbt::ExtractTxError as BdkExtractTxError;
911
use bdk_chain::local_chain::CannotConnectError as BdkChainConnectionError;
1012
use bdk_chain::tx_graph::CalculateFeeError as BdkChainCalculateFeeError;
@@ -20,6 +22,8 @@ pub enum Error {
2022
AlreadyRunning,
2123
/// Returned when trying to stop [`crate::Node`] while it is not running.
2224
NotRunning,
25+
/// An attempt to setup a runtime has failed.
26+
RuntimeSetupFailed,
2327
/// An on-chain transaction could not be created.
2428
OnchainTxCreationFailed,
2529
/// A network connection has been closed.
@@ -127,6 +131,7 @@ impl fmt::Display for Error {
127131
match *self {
128132
Self::AlreadyRunning => write!(f, "Node is already running."),
129133
Self::NotRunning => write!(f, "Node is not running."),
134+
Self::RuntimeSetupFailed => write!(f, "Failed to setup a runtime."),
130135
Self::OnchainTxCreationFailed => {
131136
write!(f, "On-chain transaction could not be created.")
132137
},
@@ -199,6 +204,16 @@ impl fmt::Display for Error {
199204

200205
impl std::error::Error for Error {}
201206

207+
impl From<RuntimeError> for Error {
208+
fn from(runtime_error: RuntimeError) -> Self {
209+
match runtime_error {
210+
RuntimeError::SetupFailed => Self::RuntimeSetupFailed,
211+
RuntimeError::AlreadyRunning => Self::AlreadyRunning,
212+
RuntimeError::NotRunning => Self::NotRunning,
213+
}
214+
}
215+
}
216+
202217
impl From<BdkSignerError> for Error {
203218
fn from(_: BdkSignerError) -> Self {
204219
Self::OnchainTxSigningFailed

src/event.rs

Lines changed: 36 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ use crate::io::{
2929
};
3030
use crate::logger::{log_debug, log_error, log_info, LdkLogger};
3131

32+
use crate::runtime::{Runtime, RuntimeError};
33+
3234
use lightning::events::bump_transaction::BumpTransactionEvent;
3335
use lightning::events::{ClosureReason, PaymentPurpose, ReplayEvent};
3436
use lightning::events::{Event as LdkEvent, PaymentFailureReason};
@@ -53,7 +55,7 @@ use core::future::Future;
5355
use core::task::{Poll, Waker};
5456
use std::collections::VecDeque;
5557
use std::ops::Deref;
56-
use std::sync::{Arc, Condvar, Mutex, RwLock};
58+
use std::sync::{Arc, Condvar, Mutex};
5759
use std::time::Duration;
5860

5961
/// An event emitted by [`Node`], which should be handled by the user.
@@ -451,7 +453,7 @@ where
451453
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
452454
payment_store: Arc<PaymentStore>,
453455
peer_store: Arc<PeerStore<L>>,
454-
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>,
456+
runtime: Arc<Runtime>,
455457
logger: L,
456458
config: Arc<Config>,
457459
}
@@ -466,8 +468,8 @@ where
466468
channel_manager: Arc<ChannelManager>, connection_manager: Arc<ConnectionManager<L>>,
467469
output_sweeper: Arc<Sweeper>, network_graph: Arc<Graph>,
468470
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
469-
payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>,
470-
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>, logger: L, config: Arc<Config>,
471+
payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>, runtime: Arc<Runtime>,
472+
logger: L, config: Arc<Config>,
471473
) -> Self {
472474
Self {
473475
event_queue,
@@ -1049,16 +1051,16 @@ where
10491051
let forwarding_channel_manager = self.channel_manager.clone();
10501052
let min = time_forwardable.as_millis() as u64;
10511053

1052-
let runtime_lock = self.runtime.read().unwrap();
1053-
debug_assert!(runtime_lock.is_some());
1054+
let future = async move {
1055+
let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64;
1056+
tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await;
10541057

1055-
if let Some(runtime) = runtime_lock.as_ref() {
1056-
runtime.spawn(async move {
1057-
let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64;
1058-
tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await;
1058+
forwarding_channel_manager.process_pending_htlc_forwards();
1059+
};
10591060

1060-
forwarding_channel_manager.process_pending_htlc_forwards();
1061-
});
1061+
if let Err(RuntimeError::NotRunning) = self.runtime.spawn(future) {
1062+
log_error!(self.logger, "Tried spawing a future while the runtime wasn't available. This should never happen.");
1063+
debug_assert!(false, "Tried spawing a future while the runtime wasn't available. This should never happen.");
10621064
}
10631065
},
10641066
LdkEvent::SpendableOutputs { outputs, channel_id } => {
@@ -1419,30 +1421,29 @@ where
14191421
debug_assert!(false, "We currently don't handle BOLT12 invoices manually, so this event should never be emitted.");
14201422
},
14211423
LdkEvent::ConnectionNeeded { node_id, addresses } => {
1422-
let runtime_lock = self.runtime.read().unwrap();
1423-
debug_assert!(runtime_lock.is_some());
1424-
1425-
if let Some(runtime) = runtime_lock.as_ref() {
1426-
let spawn_logger = self.logger.clone();
1427-
let spawn_cm = Arc::clone(&self.connection_manager);
1428-
runtime.spawn(async move {
1429-
for addr in &addresses {
1430-
match spawn_cm.connect_peer_if_necessary(node_id, addr.clone()).await {
1431-
Ok(()) => {
1432-
return;
1433-
},
1434-
Err(e) => {
1435-
log_error!(
1436-
spawn_logger,
1437-
"Failed to establish connection to peer {}@{}: {}",
1438-
node_id,
1439-
addr,
1440-
e
1441-
);
1442-
},
1443-
}
1424+
let spawn_logger = self.logger.clone();
1425+
let spawn_cm = Arc::clone(&self.connection_manager);
1426+
let future = async move {
1427+
for addr in &addresses {
1428+
match spawn_cm.connect_peer_if_necessary(node_id, addr.clone()).await {
1429+
Ok(()) => {
1430+
return;
1431+
},
1432+
Err(e) => {
1433+
log_error!(
1434+
spawn_logger,
1435+
"Failed to establish connection to peer {}@{}: {}",
1436+
node_id,
1437+
addr,
1438+
e
1439+
);
1440+
},
14441441
}
1445-
});
1442+
}
1443+
};
1444+
if let Err(RuntimeError::NotRunning) = self.runtime.spawn(future) {
1445+
log_error!(self.logger, "Tried spawing a future while the runtime wasn't available. This should never happen.");
1446+
debug_assert!(false, "Tried spawing a future while the runtime wasn't available. This should never happen.");
14461447
}
14471448
},
14481449
LdkEvent::BumpTransaction(bte) => {

src/gossip.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,15 @@
88
use crate::chain::ChainSource;
99
use crate::config::RGS_SYNC_TIMEOUT_SECS;
1010
use crate::logger::{log_error, log_trace, LdkLogger, Logger};
11+
use crate::runtime::{Runtime, RuntimeError};
1112
use crate::types::{GossipSync, Graph, P2PGossipSync, PeerManager, RapidGossipSync, UtxoLookup};
1213
use crate::Error;
1314

1415
use lightning_block_sync::gossip::{FutureSpawner, GossipVerifier};
1516

1617
use std::future::Future;
1718
use std::sync::atomic::{AtomicU32, Ordering};
18-
use std::sync::{Arc, RwLock};
19+
use std::sync::Arc;
1920
use std::time::Duration;
2021

2122
pub(crate) enum GossipSource {
@@ -63,7 +64,7 @@ impl GossipSource {
6364

6465
pub(crate) fn set_gossip_verifier(
6566
&self, chain_source: Arc<ChainSource>, peer_manager: Arc<PeerManager>,
66-
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>,
67+
runtime: Arc<Runtime>,
6768
) {
6869
match self {
6970
Self::P2PNetwork { gossip_sync, logger } => {
@@ -133,28 +134,21 @@ impl GossipSource {
133134
}
134135

135136
pub(crate) struct RuntimeSpawner {
136-
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>,
137+
runtime: Arc<Runtime>,
137138
logger: Arc<Logger>,
138139
}
139140

140141
impl RuntimeSpawner {
141-
pub(crate) fn new(
142-
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>, logger: Arc<Logger>,
143-
) -> Self {
142+
pub(crate) fn new(runtime: Arc<Runtime>, logger: Arc<Logger>) -> Self {
144143
Self { runtime, logger }
145144
}
146145
}
147146

148147
impl FutureSpawner for RuntimeSpawner {
149148
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
150-
let rt_lock = self.runtime.read().unwrap();
151-
if rt_lock.is_none() {
149+
if let Err(RuntimeError::NotRunning) = self.runtime.spawn(future) {
152150
log_error!(self.logger, "Tried spawing a future while the runtime wasn't available. This should never happen.");
153151
debug_assert!(false, "Tried spawing a future while the runtime wasn't available. This should never happen.");
154-
return;
155152
}
156-
157-
let runtime = rt_lock.as_ref().unwrap();
158-
runtime.spawn(future);
159153
}
160154
}

0 commit comments

Comments
 (0)