Skip to content

Commit d42f762

Browse files
committed
Introduce Runtime object and allow to take a tokio::runtime::Handle
1 parent 5586b69 commit d42f762

File tree

12 files changed

+338
-271
lines changed

12 files changed

+338
-271
lines changed

src/builder.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::logger::{log_error, log_info, LdkLogger, LogLevel, LogWriter, Logger}
2525
use crate::message_handler::NodeCustomMessageHandler;
2626
use crate::payment::store::PaymentStore;
2727
use crate::peer_store::PeerStore;
28+
use crate::runtime::Runtime;
2829
use crate::tx_broadcaster::TransactionBroadcaster;
2930
use crate::types::{
3031
ChainMonitor, ChannelManager, DynStore, GossipSync, Graph, KeysManager, MessageRouter,
@@ -1095,7 +1096,7 @@ fn build_with_store_internal(
10951096
},
10961097
};
10971098

1098-
let runtime = Arc::new(RwLock::new(None));
1099+
let runtime = Arc::new(Runtime::new());
10991100

11001101
// Initialize the ChainMonitor
11011102
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(

src/chain/electrum.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use crate::fee_estimator::{
1515
ConfirmationTarget,
1616
};
1717
use crate::logger::{log_bytes, log_error, log_info, log_trace, LdkLogger, Logger};
18+
use crate::runtime::Runtime;
1819

1920
use lightning::chain::{Confirm, Filter, WatchedOutput};
2021
use lightning::util::ser::Writeable;
@@ -46,15 +47,14 @@ pub(crate) struct ElectrumRuntimeClient {
4647
electrum_client: Arc<ElectrumClient>,
4748
bdk_electrum_client: Arc<BdkElectrumClient<ElectrumClient>>,
4849
tx_sync: Arc<ElectrumSyncClient<Arc<Logger>>>,
49-
runtime: Arc<tokio::runtime::Runtime>,
50+
runtime: Arc<Runtime>,
5051
config: Arc<Config>,
5152
logger: Arc<Logger>,
5253
}
5354

5455
impl ElectrumRuntimeClient {
5556
pub(crate) fn new(
56-
server_url: String, runtime: Arc<tokio::runtime::Runtime>, config: Arc<Config>,
57-
logger: Arc<Logger>,
57+
server_url: String, runtime: Arc<Runtime>, config: Arc<Config>, logger: Arc<Logger>,
5858
) -> Result<Self, Error> {
5959
let electrum_config = ElectrumConfigBuilder::new()
6060
.retry(ELECTRUM_CLIENT_NUM_RETRIES)
@@ -88,7 +88,7 @@ impl ElectrumRuntimeClient {
8888
let now = Instant::now();
8989

9090
let tx_sync = Arc::clone(&self.tx_sync);
91-
let spawn_fut = self.runtime.spawn_blocking(move || tx_sync.sync(confirmables));
91+
let spawn_fut = self.runtime.spawn_blocking(move || tx_sync.sync(confirmables))?;
9292
let timeout_fut =
9393
tokio::time::timeout(Duration::from_secs(LDK_WALLET_SYNC_TIMEOUT_SECS), spawn_fut);
9494

@@ -130,7 +130,7 @@ impl ElectrumRuntimeClient {
130130
BDK_ELECTRUM_CLIENT_BATCH_SIZE,
131131
true,
132132
)
133-
});
133+
})?;
134134
let wallet_sync_timeout_fut =
135135
tokio::time::timeout(Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), spawn_fut);
136136

@@ -159,7 +159,7 @@ impl ElectrumRuntimeClient {
159159

160160
let spawn_fut = self.runtime.spawn_blocking(move || {
161161
bdk_electrum_client.sync(request, BDK_ELECTRUM_CLIENT_BATCH_SIZE, true)
162-
});
162+
})?;
163163
let wallet_sync_timeout_fut =
164164
tokio::time::timeout(Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), spawn_fut);
165165

@@ -185,8 +185,13 @@ impl ElectrumRuntimeClient {
185185
let txid = tx.compute_txid();
186186
let tx_bytes = tx.encode();
187187

188-
let spawn_fut =
189-
self.runtime.spawn_blocking(move || electrum_client.transaction_broadcast(&tx));
188+
let spawn_fut = if let Ok(spawn_fut) =
189+
self.runtime.spawn_blocking(move || electrum_client.transaction_broadcast(&tx))
190+
{
191+
spawn_fut
192+
} else {
193+
return;
194+
};
190195

191196
let timeout_fut =
192197
tokio::time::timeout(Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS), spawn_fut);
@@ -233,7 +238,7 @@ impl ElectrumRuntimeClient {
233238
batch.estimate_fee(num_blocks);
234239
}
235240

236-
let spawn_fut = self.runtime.spawn_blocking(move || electrum_client.batch_call(&batch));
241+
let spawn_fut = self.runtime.spawn_blocking(move || electrum_client.batch_call(&batch))?;
237242

238243
let timeout_fut = tokio::time::timeout(
239244
Duration::from_secs(FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS),

src/chain/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::fee_estimator::{
2424
};
2525
use crate::io::utils::write_node_metrics;
2626
use crate::logger::{log_bytes, log_error, log_info, log_trace, LdkLogger, Logger};
27+
use crate::runtime::Runtime;
2728
use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
2829
use crate::{Error, NodeMetrics};
2930

@@ -126,7 +127,7 @@ impl ElectrumRuntimeStatus {
126127
}
127128

128129
pub(crate) fn start(
129-
&mut self, server_url: String, runtime: Arc<tokio::runtime::Runtime>, config: Arc<Config>,
130+
&mut self, server_url: String, runtime: Arc<Runtime>, config: Arc<Config>,
130131
logger: Arc<Logger>,
131132
) -> Result<(), Error> {
132133
match self {
@@ -311,7 +312,7 @@ impl ChainSource {
311312
}
312313
}
313314

314-
pub(crate) fn start(&self, runtime: Arc<tokio::runtime::Runtime>) -> Result<(), Error> {
315+
pub(crate) fn start(&self, runtime: Arc<Runtime>) -> Result<(), Error> {
315316
match self {
316317
Self::Electrum { server_url, electrum_runtime_status, config, logger, .. } => {
317318
electrum_runtime_status.write().unwrap().start(

src/event.rs

Lines changed: 36 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ use crate::io::{
2929
};
3030
use crate::logger::{log_debug, log_error, log_info, LdkLogger};
3131

32+
use crate::runtime::Runtime;
33+
3234
use lightning::events::bump_transaction::BumpTransactionEvent;
3335
use lightning::events::{ClosureReason, PaymentPurpose, ReplayEvent};
3436
use lightning::events::{Event as LdkEvent, PaymentFailureReason};
@@ -53,7 +55,7 @@ use core::future::Future;
5355
use core::task::{Poll, Waker};
5456
use std::collections::VecDeque;
5557
use std::ops::Deref;
56-
use std::sync::{Arc, Condvar, Mutex, RwLock};
58+
use std::sync::{Arc, Condvar, Mutex};
5759
use std::time::Duration;
5860

5961
/// An event emitted by [`Node`], which should be handled by the user.
@@ -451,7 +453,7 @@ where
451453
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
452454
payment_store: Arc<PaymentStore<L>>,
453455
peer_store: Arc<PeerStore<L>>,
454-
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>,
456+
runtime: Arc<Runtime>,
455457
logger: L,
456458
config: Arc<Config>,
457459
}
@@ -466,8 +468,8 @@ where
466468
channel_manager: Arc<ChannelManager>, connection_manager: Arc<ConnectionManager<L>>,
467469
output_sweeper: Arc<Sweeper>, network_graph: Arc<Graph>,
468470
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
469-
payment_store: Arc<PaymentStore<L>>, peer_store: Arc<PeerStore<L>>,
470-
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>, logger: L, config: Arc<Config>,
471+
payment_store: Arc<PaymentStore<L>>, peer_store: Arc<PeerStore<L>>, runtime: Arc<Runtime>,
472+
logger: L, config: Arc<Config>,
471473
) -> Self {
472474
Self {
473475
event_queue,
@@ -1050,16 +1052,16 @@ where
10501052
let forwarding_channel_manager = self.channel_manager.clone();
10511053
let min = time_forwardable.as_millis() as u64;
10521054

1053-
let runtime_lock = self.runtime.read().unwrap();
1054-
debug_assert!(runtime_lock.is_some());
1055+
let future = async move {
1056+
let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64;
1057+
tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await;
10551058

1056-
if let Some(runtime) = runtime_lock.as_ref() {
1057-
runtime.spawn(async move {
1058-
let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64;
1059-
tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await;
1059+
forwarding_channel_manager.process_pending_htlc_forwards();
1060+
};
10601061

1061-
forwarding_channel_manager.process_pending_htlc_forwards();
1062-
});
1062+
if let Err(Error::NotRunning) = self.runtime.spawn(future) {
1063+
log_error!(self.logger, "Tried spawing a future while the runtime wasn't available. This should never happen.");
1064+
debug_assert!(false, "Tried spawing a future while the runtime wasn't available. This should never happen.");
10631065
}
10641066
},
10651067
LdkEvent::SpendableOutputs { outputs, channel_id } => {
@@ -1420,30 +1422,29 @@ where
14201422
debug_assert!(false, "We currently don't handle BOLT12 invoices manually, so this event should never be emitted.");
14211423
},
14221424
LdkEvent::ConnectionNeeded { node_id, addresses } => {
1423-
let runtime_lock = self.runtime.read().unwrap();
1424-
debug_assert!(runtime_lock.is_some());
1425-
1426-
if let Some(runtime) = runtime_lock.as_ref() {
1427-
let spawn_logger = self.logger.clone();
1428-
let spawn_cm = Arc::clone(&self.connection_manager);
1429-
runtime.spawn(async move {
1430-
for addr in &addresses {
1431-
match spawn_cm.connect_peer_if_necessary(node_id, addr.clone()).await {
1432-
Ok(()) => {
1433-
return;
1434-
},
1435-
Err(e) => {
1436-
log_error!(
1437-
spawn_logger,
1438-
"Failed to establish connection to peer {}@{}: {}",
1439-
node_id,
1440-
addr,
1441-
e
1442-
);
1443-
},
1444-
}
1425+
let spawn_logger = self.logger.clone();
1426+
let spawn_cm = Arc::clone(&self.connection_manager);
1427+
let future = async move {
1428+
for addr in &addresses {
1429+
match spawn_cm.connect_peer_if_necessary(node_id, addr.clone()).await {
1430+
Ok(()) => {
1431+
return;
1432+
},
1433+
Err(e) => {
1434+
log_error!(
1435+
spawn_logger,
1436+
"Failed to establish connection to peer {}@{}: {}",
1437+
node_id,
1438+
addr,
1439+
e
1440+
);
1441+
},
14451442
}
1446-
});
1443+
}
1444+
};
1445+
if let Err(Error::NotRunning) = self.runtime.spawn(future) {
1446+
log_error!(self.logger, "Tried spawing a future while the runtime wasn't available. This should never happen.");
1447+
debug_assert!(false, "Tried spawing a future while the runtime wasn't available. This should never happen.");
14471448
}
14481449
},
14491450
LdkEvent::BumpTransaction(bte) => {

src/gossip.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,15 @@
88
use crate::chain::ChainSource;
99
use crate::config::RGS_SYNC_TIMEOUT_SECS;
1010
use crate::logger::{log_error, log_trace, LdkLogger, Logger};
11+
use crate::runtime::Runtime;
1112
use crate::types::{GossipSync, Graph, P2PGossipSync, PeerManager, RapidGossipSync, UtxoLookup};
1213
use crate::Error;
1314

1415
use lightning_block_sync::gossip::{FutureSpawner, GossipVerifier};
1516

1617
use std::future::Future;
1718
use std::sync::atomic::{AtomicU32, Ordering};
18-
use std::sync::{Arc, RwLock};
19+
use std::sync::Arc;
1920
use std::time::Duration;
2021

2122
pub(crate) enum GossipSource {
@@ -63,7 +64,7 @@ impl GossipSource {
6364

6465
pub(crate) fn set_gossip_verifier(
6566
&self, chain_source: Arc<ChainSource>, peer_manager: Arc<PeerManager>,
66-
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>,
67+
runtime: Arc<Runtime>,
6768
) {
6869
match self {
6970
Self::P2PNetwork { gossip_sync, logger } => {
@@ -133,28 +134,21 @@ impl GossipSource {
133134
}
134135

135136
pub(crate) struct RuntimeSpawner {
136-
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>,
137+
runtime: Arc<Runtime>,
137138
logger: Arc<Logger>,
138139
}
139140

140141
impl RuntimeSpawner {
141-
pub(crate) fn new(
142-
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>, logger: Arc<Logger>,
143-
) -> Self {
142+
pub(crate) fn new(runtime: Arc<Runtime>, logger: Arc<Logger>) -> Self {
144143
Self { runtime, logger }
145144
}
146145
}
147146

148147
impl FutureSpawner for RuntimeSpawner {
149148
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
150-
let rt_lock = self.runtime.read().unwrap();
151-
if rt_lock.is_none() {
149+
if let Err(Error::NotRunning) = self.runtime.spawn(future) {
152150
log_error!(self.logger, "Tried spawing a future while the runtime wasn't available. This should never happen.");
153151
debug_assert!(false, "Tried spawing a future while the runtime wasn't available. This should never happen.");
154-
return;
155152
}
156-
157-
let runtime = rt_lock.as_ref().unwrap();
158-
runtime.spawn(future);
159153
}
160154
}

0 commit comments

Comments
 (0)