From 9ea288bf709b2b0d2e55b8d2fd373c0c1f869c96 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 26 May 2025 15:48:45 +0200 Subject: [PATCH 01/10] Fix `module_path`/`line` numbers in `log` facade logging Previously, we'd reuse the `log` facade macros which would always log the line number in `logger.rs`, not at the line number given at the record. Here, we rectify this by utilizing `log`'s `RecordBuilder` to craft a record and directly giving it to the configured logger. --- src/logger.rs | 43 ++++++++++++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/src/logger.rs b/src/logger.rs index 073aa92bc..d357f018d 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -13,7 +13,8 @@ pub(crate) use lightning::{log_bytes, log_debug, log_error, log_info, log_trace} pub use lightning::util::logger::Level as LogLevel; use chrono::Utc; -use log::{debug, error, info, trace, warn}; +use log::Level as LogFacadeLevel; +use log::Record as LogFacadeRecord; #[cfg(not(feature = "uniffi"))] use core::fmt; @@ -139,20 +140,32 @@ impl LogWriter for Writer { .expect("Failed to write to log file") }, Writer::LogFacadeWriter => { - macro_rules! log_with_level { - ($log_level:expr, $target: expr, $($args:tt)*) => { - match $log_level { - LogLevel::Gossip | LogLevel::Trace => trace!(target: $target, $($args)*), - LogLevel::Debug => debug!(target: $target, $($args)*), - LogLevel::Info => info!(target: $target, $($args)*), - LogLevel::Warn => warn!(target: $target, $($args)*), - LogLevel::Error => error!(target: $target, $($args)*), - } - }; - } - - let target = format!("[{}:{}]", record.module_path, record.line); - log_with_level!(record.level, &target, " {}", record.args) + let mut builder = LogFacadeRecord::builder(); + + match record.level { + LogLevel::Gossip | LogLevel::Trace => builder.level(LogFacadeLevel::Trace), + LogLevel::Debug => builder.level(LogFacadeLevel::Debug), + LogLevel::Info => builder.level(LogFacadeLevel::Info), + LogLevel::Warn => builder.level(LogFacadeLevel::Warn), + LogLevel::Error => builder.level(LogFacadeLevel::Error), + }; + + #[cfg(not(feature = "uniffi"))] + log::logger().log( + &builder + .module_path(Some(record.module_path)) + .line(Some(record.line)) + .args(format_args!("{}", record.args)) + .build(), + ); + #[cfg(feature = "uniffi")] + log::logger().log( + &builder + .module_path(Some(&record.module_path)) + .line(Some(record.line)) + .args(format_args!("{}", record.args)) + .build(), + ); }, Writer::CustomWriter(custom_logger) => custom_logger.log(record), } From 62b076407ff1206bad7d9bfae580f3d825e68693 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 19 May 2025 16:27:28 +0200 Subject: [PATCH 02/10] Introduce `Runtime` object allowng to detect outer runtime context Instead of holding an `Arc>>` and dealing with stuff like `tokio::task::block_in_place` at all callsites, we introduce a `Runtime` object that takes care of the state transitions, and allows to detect and reuse an outer runtime context. We also adjust the `with_runtime` API to take a `tokio::runtime::Handle` rather than an `Arc`. --- bindings/ldk_node.udl | 7 +- src/builder.rs | 3 +- src/chain/electrum.rs | 23 ++-- src/chain/mod.rs | 5 +- src/error.rs | 15 +++ src/event.rs | 71 +++++++------ src/gossip.rs | 18 ++-- src/lib.rs | 182 ++++++++++++++------------------ src/liquidity.rs | 59 ++++------- src/payment/bolt11.rs | 83 +++++++-------- src/payment/bolt12.rs | 17 ++- src/payment/onchain.rs | 15 ++- src/payment/spontaneous.rs | 17 ++- src/runtime.rs | 161 ++++++++++++++++++++++++++++ tests/integration_tests_rust.rs | 12 +++ 15 files changed, 413 insertions(+), 275 deletions(-) create mode 100644 src/runtime.rs diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index c2f0166c8..a85c2eca7 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -64,7 +64,7 @@ dictionary LogRecord { [Trait, WithForeign] interface LogWriter { - void log(LogRecord record); + void log(LogRecord record); }; interface Builder { @@ -160,8 +160,8 @@ interface Node { [Enum] interface Bolt11InvoiceDescription { - Hash(string hash); - Direct(string description); + Hash(string hash); + Direct(string description); }; interface Bolt11Payment { @@ -252,6 +252,7 @@ interface LSPS1Liquidity { enum NodeError { "AlreadyRunning", "NotRunning", + "RuntimeSetupFailed", "OnchainTxCreationFailed", "ConnectionFailed", "InvoiceCreationFailed", diff --git a/src/builder.rs b/src/builder.rs index 31a0fee45..77f643550 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -27,6 +27,7 @@ use crate::liquidity::{ use crate::logger::{log_error, log_info, LdkLogger, LogLevel, LogWriter, Logger}; use crate::message_handler::NodeCustomMessageHandler; use crate::peer_store::PeerStore; +use crate::runtime::Runtime; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ ChainMonitor, ChannelManager, DynStore, GossipSync, Graph, KeysManager, MessageRouter, @@ -1101,7 +1102,7 @@ fn build_with_store_internal( }, }; - let runtime = Arc::new(RwLock::new(None)); + let runtime = Arc::new(Runtime::new(Arc::clone(&logger))); // Initialize the ChainMonitor let chain_monitor: Arc = Arc::new(chainmonitor::ChainMonitor::new( diff --git a/src/chain/electrum.rs b/src/chain/electrum.rs index 6e62d9c08..eece892a6 100644 --- a/src/chain/electrum.rs +++ b/src/chain/electrum.rs @@ -15,6 +15,7 @@ use crate::fee_estimator::{ ConfirmationTarget, }; use crate::logger::{log_bytes, log_error, log_info, log_trace, LdkLogger, Logger}; +use crate::runtime::Runtime; use lightning::chain::{Confirm, Filter, WatchedOutput}; use lightning::util::ser::Writeable; @@ -46,15 +47,14 @@ pub(crate) struct ElectrumRuntimeClient { electrum_client: Arc, bdk_electrum_client: Arc>, tx_sync: Arc>>, - runtime: Arc, + runtime: Arc, config: Arc, logger: Arc, } impl ElectrumRuntimeClient { pub(crate) fn new( - server_url: String, runtime: Arc, config: Arc, - logger: Arc, + server_url: String, runtime: Arc, config: Arc, logger: Arc, ) -> Result { let electrum_config = ElectrumConfigBuilder::new() .retry(ELECTRUM_CLIENT_NUM_RETRIES) @@ -88,7 +88,7 @@ impl ElectrumRuntimeClient { let now = Instant::now(); let tx_sync = Arc::clone(&self.tx_sync); - let spawn_fut = self.runtime.spawn_blocking(move || tx_sync.sync(confirmables)); + let spawn_fut = self.runtime.spawn_blocking(move || tx_sync.sync(confirmables))?; let timeout_fut = tokio::time::timeout(Duration::from_secs(LDK_WALLET_SYNC_TIMEOUT_SECS), spawn_fut); @@ -130,7 +130,7 @@ impl ElectrumRuntimeClient { BDK_ELECTRUM_CLIENT_BATCH_SIZE, true, ) - }); + })?; let wallet_sync_timeout_fut = tokio::time::timeout(Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), spawn_fut); @@ -159,7 +159,7 @@ impl ElectrumRuntimeClient { let spawn_fut = self.runtime.spawn_blocking(move || { bdk_electrum_client.sync(request, BDK_ELECTRUM_CLIENT_BATCH_SIZE, true) - }); + })?; let wallet_sync_timeout_fut = tokio::time::timeout(Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), spawn_fut); @@ -185,8 +185,13 @@ impl ElectrumRuntimeClient { let txid = tx.compute_txid(); let tx_bytes = tx.encode(); - let spawn_fut = - self.runtime.spawn_blocking(move || electrum_client.transaction_broadcast(&tx)); + let spawn_fut = if let Ok(spawn_fut) = + self.runtime.spawn_blocking(move || electrum_client.transaction_broadcast(&tx)) + { + spawn_fut + } else { + return; + }; let timeout_fut = tokio::time::timeout(Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS), spawn_fut); @@ -233,7 +238,7 @@ impl ElectrumRuntimeClient { batch.estimate_fee(num_blocks); } - let spawn_fut = self.runtime.spawn_blocking(move || electrum_client.batch_call(&batch)); + let spawn_fut = self.runtime.spawn_blocking(move || electrum_client.batch_call(&batch))?; let timeout_fut = tokio::time::timeout( Duration::from_secs(FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS), diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 62627797e..31eb5f53d 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -24,6 +24,7 @@ use crate::fee_estimator::{ }; use crate::io::utils::write_node_metrics; use crate::logger::{log_bytes, log_error, log_info, log_trace, LdkLogger, Logger}; +use crate::runtime::Runtime; use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; use crate::{Error, NodeMetrics}; @@ -126,7 +127,7 @@ impl ElectrumRuntimeStatus { } pub(crate) fn start( - &mut self, server_url: String, runtime: Arc, config: Arc, + &mut self, server_url: String, runtime: Arc, config: Arc, logger: Arc, ) -> Result<(), Error> { match self { @@ -311,7 +312,7 @@ impl ChainSource { } } - pub(crate) fn start(&self, runtime: Arc) -> Result<(), Error> { + pub(crate) fn start(&self, runtime: Arc) -> Result<(), Error> { match self { Self::Electrum { server_url, electrum_runtime_status, config, logger, .. } => { electrum_runtime_status.write().unwrap().start( diff --git a/src/error.rs b/src/error.rs index 2cb71186d..4f7a7bde3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -5,6 +5,8 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. +use crate::runtime::RuntimeError; + use bdk_chain::bitcoin::psbt::ExtractTxError as BdkExtractTxError; use bdk_chain::local_chain::CannotConnectError as BdkChainConnectionError; use bdk_chain::tx_graph::CalculateFeeError as BdkChainCalculateFeeError; @@ -20,6 +22,8 @@ pub enum Error { AlreadyRunning, /// Returned when trying to stop [`crate::Node`] while it is not running. NotRunning, + /// An attempt to setup a runtime has failed. + RuntimeSetupFailed, /// An on-chain transaction could not be created. OnchainTxCreationFailed, /// A network connection has been closed. @@ -127,6 +131,7 @@ impl fmt::Display for Error { match *self { Self::AlreadyRunning => write!(f, "Node is already running."), Self::NotRunning => write!(f, "Node is not running."), + Self::RuntimeSetupFailed => write!(f, "Failed to setup a runtime."), Self::OnchainTxCreationFailed => { write!(f, "On-chain transaction could not be created.") }, @@ -199,6 +204,16 @@ impl fmt::Display for Error { impl std::error::Error for Error {} +impl From for Error { + fn from(runtime_error: RuntimeError) -> Self { + match runtime_error { + RuntimeError::SetupFailed => Self::RuntimeSetupFailed, + RuntimeError::AlreadyRunning => Self::AlreadyRunning, + RuntimeError::NotRunning => Self::NotRunning, + } + } +} + impl From for Error { fn from(_: BdkSignerError) -> Self { Self::OnchainTxSigningFailed diff --git a/src/event.rs b/src/event.rs index e95983710..2b372ed8c 100644 --- a/src/event.rs +++ b/src/event.rs @@ -29,6 +29,8 @@ use crate::io::{ }; use crate::logger::{log_debug, log_error, log_info, LdkLogger}; +use crate::runtime::{Runtime, RuntimeError}; + use lightning::events::bump_transaction::BumpTransactionEvent; use lightning::events::{ClosureReason, PaymentPurpose, ReplayEvent}; use lightning::events::{Event as LdkEvent, PaymentFailureReason}; @@ -53,7 +55,7 @@ use core::future::Future; use core::task::{Poll, Waker}; use std::collections::VecDeque; use std::ops::Deref; -use std::sync::{Arc, Condvar, Mutex, RwLock}; +use std::sync::{Arc, Condvar, Mutex}; use std::time::Duration; /// An event emitted by [`Node`], which should be handled by the user. @@ -451,7 +453,7 @@ where liquidity_source: Option>>>, payment_store: Arc, peer_store: Arc>, - runtime: Arc>>>, + runtime: Arc, logger: L, config: Arc, } @@ -466,8 +468,8 @@ where channel_manager: Arc, connection_manager: Arc>, output_sweeper: Arc, network_graph: Arc, liquidity_source: Option>>>, - payment_store: Arc, peer_store: Arc>, - runtime: Arc>>>, logger: L, config: Arc, + payment_store: Arc, peer_store: Arc>, runtime: Arc, + logger: L, config: Arc, ) -> Self { Self { event_queue, @@ -1049,16 +1051,16 @@ where let forwarding_channel_manager = self.channel_manager.clone(); let min = time_forwardable.as_millis() as u64; - let runtime_lock = self.runtime.read().unwrap(); - debug_assert!(runtime_lock.is_some()); + let future = async move { + let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64; + tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await; - if let Some(runtime) = runtime_lock.as_ref() { - runtime.spawn(async move { - let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64; - tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await; + forwarding_channel_manager.process_pending_htlc_forwards(); + }; - forwarding_channel_manager.process_pending_htlc_forwards(); - }); + if let Err(RuntimeError::NotRunning) = self.runtime.spawn(future) { + log_error!(self.logger, "Tried spawing a future while the runtime wasn't available. This should never happen."); + debug_assert!(false, "Tried spawing a future while the runtime wasn't available. This should never happen."); } }, LdkEvent::SpendableOutputs { outputs, channel_id } => { @@ -1419,30 +1421,29 @@ where debug_assert!(false, "We currently don't handle BOLT12 invoices manually, so this event should never be emitted."); }, LdkEvent::ConnectionNeeded { node_id, addresses } => { - let runtime_lock = self.runtime.read().unwrap(); - debug_assert!(runtime_lock.is_some()); - - if let Some(runtime) = runtime_lock.as_ref() { - let spawn_logger = self.logger.clone(); - let spawn_cm = Arc::clone(&self.connection_manager); - runtime.spawn(async move { - for addr in &addresses { - match spawn_cm.connect_peer_if_necessary(node_id, addr.clone()).await { - Ok(()) => { - return; - }, - Err(e) => { - log_error!( - spawn_logger, - "Failed to establish connection to peer {}@{}: {}", - node_id, - addr, - e - ); - }, - } + let spawn_logger = self.logger.clone(); + let spawn_cm = Arc::clone(&self.connection_manager); + let future = async move { + for addr in &addresses { + match spawn_cm.connect_peer_if_necessary(node_id, addr.clone()).await { + Ok(()) => { + return; + }, + Err(e) => { + log_error!( + spawn_logger, + "Failed to establish connection to peer {}@{}: {}", + node_id, + addr, + e + ); + }, } - }); + } + }; + if let Err(RuntimeError::NotRunning) = self.runtime.spawn(future) { + log_error!(self.logger, "Tried spawing a future while the runtime wasn't available. This should never happen."); + debug_assert!(false, "Tried spawing a future while the runtime wasn't available. This should never happen."); } }, LdkEvent::BumpTransaction(bte) => { diff --git a/src/gossip.rs b/src/gossip.rs index a8a6e3831..18166fd3f 100644 --- a/src/gossip.rs +++ b/src/gossip.rs @@ -8,6 +8,7 @@ use crate::chain::ChainSource; use crate::config::RGS_SYNC_TIMEOUT_SECS; use crate::logger::{log_error, log_trace, LdkLogger, Logger}; +use crate::runtime::{Runtime, RuntimeError}; use crate::types::{GossipSync, Graph, P2PGossipSync, PeerManager, RapidGossipSync, UtxoLookup}; use crate::Error; @@ -15,7 +16,7 @@ use lightning_block_sync::gossip::{FutureSpawner, GossipVerifier}; use std::future::Future; use std::sync::atomic::{AtomicU32, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::Duration; pub(crate) enum GossipSource { @@ -63,7 +64,7 @@ impl GossipSource { pub(crate) fn set_gossip_verifier( &self, chain_source: Arc, peer_manager: Arc, - runtime: Arc>>>, + runtime: Arc, ) { match self { Self::P2PNetwork { gossip_sync, logger } => { @@ -133,28 +134,21 @@ impl GossipSource { } pub(crate) struct RuntimeSpawner { - runtime: Arc>>>, + runtime: Arc, logger: Arc, } impl RuntimeSpawner { - pub(crate) fn new( - runtime: Arc>>>, logger: Arc, - ) -> Self { + pub(crate) fn new(runtime: Arc, logger: Arc) -> Self { Self { runtime, logger } } } impl FutureSpawner for RuntimeSpawner { fn spawn + Send + 'static>(&self, future: T) { - let rt_lock = self.runtime.read().unwrap(); - if rt_lock.is_none() { + if let Err(RuntimeError::NotRunning) = self.runtime.spawn(future) { log_error!(self.logger, "Tried spawing a future while the runtime wasn't available. This should never happen."); debug_assert!(false, "Tried spawing a future while the runtime wasn't available. This should never happen."); - return; } - - let runtime = rt_lock.as_ref().unwrap(); - runtime.spawn(future); } } diff --git a/src/lib.rs b/src/lib.rs index c3bfe16d8..4032dbdc0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -93,6 +93,7 @@ pub mod logger; mod message_handler; pub mod payment; mod peer_store; +mod runtime; mod sweep; mod tx_broadcaster; mod types; @@ -141,6 +142,7 @@ use payment::{ UnifiedQrPayment, }; use peer_store::{PeerInfo, PeerStore}; +use runtime::Runtime; use types::{ Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph, KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, Wallet, @@ -176,7 +178,7 @@ uniffi::include_scaffolding!("ldk_node"); /// /// Needs to be initialized and instantiated through [`Builder::build`]. pub struct Node { - runtime: Arc>>>, + runtime: Arc, stop_sender: tokio::sync::watch::Sender<()>, event_handling_stopped_sender: tokio::sync::watch::Sender<()>, config: Arc, @@ -208,30 +210,32 @@ impl Node { /// Starts the necessary background tasks, such as handling events coming from user input, /// LDK/BDK, and the peer-to-peer network. /// + /// This will try to auto-detect an outer pre-existing runtime, e.g., to avoid stacking Tokio + /// runtime contexts. Note we require the outer runtime to be of the `multithreaded` flavor. + /// /// After this returns, the [`Node`] instance can be controlled via the provided API methods in /// a thread-safe manner. pub fn start(&self) -> Result<(), Error> { - let runtime = - Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap()); - self.start_with_runtime(runtime) + self.runtime.start()?; + self.start_inner() } /// Starts the necessary background tasks (such as handling events coming from user input, /// LDK/BDK, and the peer-to-peer network) on the the given `runtime`. /// - /// This allows to have LDK Node reuse an outer pre-existing runtime, e.g., to avoid stacking Tokio - /// runtime contexts. + /// This allows to have LDK Node to specify an outer pre-existing runtime, e.g., to avoid + /// stacking Tokio runtime contexts. Note we require the runtime to be of the `multithreaded` + /// flavor. /// /// After this returns, the [`Node`] instance can be controlled via the provided API methods in /// a thread-safe manner. - pub fn start_with_runtime(&self, runtime: Arc) -> Result<(), Error> { - // Acquire a run lock and hold it until we're setup. - let mut runtime_lock = self.runtime.write().unwrap(); - if runtime_lock.is_some() { - // We're already running. - return Err(Error::AlreadyRunning); - } + pub fn start_with_runtime(&self, handle: tokio::runtime::Handle) -> Result<(), Error> { + self.runtime.start_from_handle(handle)?; + self.start_inner() + } + fn start_inner(&self) -> Result<(), Error> { + // Acquire a run lock and hold it until we're setup. log_info!( self.logger, "Starting up LDK Node with node ID {} on network: {}", @@ -240,17 +244,14 @@ impl Node { ); // Start up any runtime-dependant chain sources (e.g. Electrum) - self.chain_source.start(Arc::clone(&runtime)).map_err(|e| { + self.chain_source.start(Arc::clone(&self.runtime)).map_err(|e| { log_error!(self.logger, "Failed to start chain syncing: {}", e); e })?; // Block to ensure we update our fee rate cache once on startup let chain_source = Arc::clone(&self.chain_source); - let runtime_ref = &runtime; - tokio::task::block_in_place(move || { - runtime_ref.block_on(async move { chain_source.update_fee_rate_estimates().await }) - })?; + self.runtime.block_on(async move { chain_source.update_fee_rate_estimates().await })??; // Spawn background task continuously syncing onchain, lightning, and fee rate cache. let stop_sync_receiver = self.stop_sender.subscribe(); @@ -258,11 +259,11 @@ impl Node { let sync_cman = Arc::clone(&self.channel_manager); let sync_cmon = Arc::clone(&self.chain_monitor); let sync_sweeper = Arc::clone(&self.output_sweeper); - runtime.spawn(async move { + self.runtime.spawn(async move { chain_source .continuously_sync_wallets(stop_sync_receiver, sync_cman, sync_cmon, sync_sweeper) .await; - }); + })?; if self.gossip_source.is_rgs() { let gossip_source = Arc::clone(&self.gossip_source); @@ -270,7 +271,7 @@ impl Node { let gossip_sync_logger = Arc::clone(&self.logger); let gossip_node_metrics = Arc::clone(&self.node_metrics); let mut stop_gossip_sync = self.stop_sender.subscribe(); - runtime.spawn(async move { + self.runtime.spawn(async move { let mut interval = tokio::time::interval(RGS_SYNC_INTERVAL); loop { tokio::select! { @@ -311,7 +312,7 @@ impl Node { } } } - }); + })?; } if let Some(listening_addresses) = &self.config.listening_addresses { @@ -337,7 +338,7 @@ impl Node { bind_addrs.extend(resolved_address); } - runtime.spawn(async move { + self.runtime.spawn(async move { { let listener = tokio::net::TcpListener::bind(&*bind_addrs).await @@ -375,7 +376,7 @@ impl Node { } listening_indicator.store(false, Ordering::Release); - }); + })?; } // Regularly reconnect to persisted peers. @@ -384,7 +385,7 @@ impl Node { let connect_logger = Arc::clone(&self.logger); let connect_peer_store = Arc::clone(&self.peer_store); let mut stop_connect = self.stop_sender.subscribe(); - runtime.spawn(async move { + self.runtime.spawn(async move { let mut interval = tokio::time::interval(PEER_RECONNECTION_INTERVAL); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { @@ -412,7 +413,7 @@ impl Node { } } } - }); + })?; // Regularly broadcast node announcements. let bcast_cm = Arc::clone(&self.channel_manager); @@ -424,7 +425,7 @@ impl Node { let mut stop_bcast = self.stop_sender.subscribe(); let node_alias = self.config.node_alias.clone(); if may_announce_channel(&self.config).is_ok() { - runtime.spawn(async move { + self.runtime.spawn(async move { // We check every 30 secs whether our last broadcast is NODE_ANN_BCAST_INTERVAL away. #[cfg(not(test))] let mut interval = tokio::time::interval(Duration::from_secs(30)); @@ -495,13 +496,13 @@ impl Node { } } } - }); + })?; } let mut stop_tx_bcast = self.stop_sender.subscribe(); let chain_source = Arc::clone(&self.chain_source); let tx_bcast_logger = Arc::clone(&self.logger); - runtime.spawn(async move { + self.runtime.spawn(async move { // Every second we try to clear our broadcasting queue. let mut interval = tokio::time::interval(Duration::from_secs(1)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -519,7 +520,7 @@ impl Node { } } } - }); + })?; let bump_tx_event_handler = Arc::new(BumpTransactionEventHandler::new( Arc::clone(&self.tx_broadcaster), @@ -578,7 +579,7 @@ impl Node { let background_stop_logger = Arc::clone(&self.logger); let event_handling_stopped_sender = self.event_handling_stopped_sender.clone(); - runtime.spawn(async move { + self.runtime.spawn(async move { process_events_async( background_persister, |e| background_event_handler.handle_event(e), @@ -611,13 +612,13 @@ impl Node { debug_assert!(false); }, } - }); + })?; if let Some(liquidity_source) = self.liquidity_source.as_ref() { let mut stop_liquidity_handler = self.stop_sender.subscribe(); let liquidity_handler = Arc::clone(&liquidity_source); let liquidity_logger = Arc::clone(&self.logger); - runtime.spawn(async move { + self.runtime.spawn(async move { loop { tokio::select! { _ = stop_liquidity_handler.changed() => { @@ -630,11 +631,9 @@ impl Node { _ = liquidity_handler.handle_next_event() => {} } } - }); + })?; } - *runtime_lock = Some(runtime); - log_info!(self.logger, "Startup complete."); Ok(()) } @@ -643,9 +642,9 @@ impl Node { /// /// After this returns most API methods will return [`Error::NotRunning`]. pub fn stop(&self) -> Result<(), Error> { - let runtime = self.runtime.write().unwrap().take().ok_or(Error::NotRunning)?; - #[cfg(tokio_unstable)] - let metrics_runtime = Arc::clone(&runtime); + if !self.runtime.is_running() { + return Err(Error::NotRunning); + } log_info!(self.logger, "Shutting down LDK Node with node ID {}...", self.node_id()); @@ -675,15 +674,13 @@ impl Node { // FIXME: For now, we wait up to 100 secs (BDK_WALLET_SYNC_TIMEOUT_SECS + 10) to allow // event handling to exit gracefully even if it was blocked on the BDK wallet syncing. We // should drop this considerably post upgrading to BDK 1.0. - let timeout_res = tokio::task::block_in_place(move || { - runtime.block_on(async { - tokio::time::timeout( - Duration::from_secs(100), - event_handling_stopped_receiver.changed(), - ) - .await - }) - }); + let timeout_res = self.runtime.block_on(async { + tokio::time::timeout( + Duration::from_secs(100), + event_handling_stopped_receiver.changed(), + ) + .await + })?; match timeout_res { Ok(stop_res) => match stop_res { @@ -706,14 +703,7 @@ impl Node { }, } - #[cfg(tokio_unstable)] - { - log_trace!( - self.logger, - "Active runtime tasks left prior to shutdown: {}", - metrics_runtime.metrics().active_tasks_count() - ); - } + self.runtime.stop()?; log_info!(self.logger, "Shutdown complete."); Ok(()) @@ -721,7 +711,7 @@ impl Node { /// Returns the status of the [`Node`]. pub fn status(&self) -> NodeStatus { - let is_running = self.runtime.read().unwrap().is_some(); + let is_running = self.runtime.is_running(); let is_listening = self.is_listening.load(Ordering::Acquire); let current_best_block = self.channel_manager.current_best_block().into(); let locked_node_metrics = self.node_metrics.read().unwrap(); @@ -1012,11 +1002,9 @@ impl Node { pub fn connect( &self, node_id: PublicKey, address: SocketAddress, persist: bool, ) -> Result<(), Error> { - let rt_lock = self.runtime.read().unwrap(); - if rt_lock.is_none() { + if !self.runtime.is_running() { return Err(Error::NotRunning); } - let runtime = rt_lock.as_ref().unwrap(); let peer_info = PeerInfo { node_id, address }; @@ -1026,11 +1014,9 @@ impl Node { // We need to use our main runtime here as a local runtime might not be around to poll // connection futures going forward. - tokio::task::block_in_place(move || { - runtime.block_on(async move { - con_cm.connect_peer_if_necessary(con_node_id, con_addr).await - }) - })?; + self.runtime.block_on(async move { + con_cm.connect_peer_if_necessary(con_node_id, con_addr).await + })??; log_info!(self.logger, "Connected to peer {}@{}. ", peer_info.node_id, peer_info.address); @@ -1046,8 +1032,7 @@ impl Node { /// Will also remove the peer from the peer store, i.e., after this has been called we won't /// try to reconnect on restart. pub fn disconnect(&self, counterparty_node_id: PublicKey) -> Result<(), Error> { - let rt_lock = self.runtime.read().unwrap(); - if rt_lock.is_none() { + if !self.runtime.is_running() { return Err(Error::NotRunning); } @@ -1069,11 +1054,9 @@ impl Node { push_to_counterparty_msat: Option, channel_config: Option, announce_for_forwarding: bool, ) -> Result { - let rt_lock = self.runtime.read().unwrap(); - if rt_lock.is_none() { + if !self.runtime.is_running() { return Err(Error::NotRunning); } - let runtime = rt_lock.as_ref().unwrap(); let peer_info = PeerInfo { node_id, address }; @@ -1097,11 +1080,9 @@ impl Node { // We need to use our main runtime here as a local runtime might not be around to poll // connection futures going forward. - tokio::task::block_in_place(move || { - runtime.block_on(async move { - con_cm.connect_peer_if_necessary(con_node_id, con_addr).await - }) - })?; + self.runtime.block_on(async move { + con_cm.connect_peer_if_necessary(con_node_id, con_addr).await + })??; // Fail if we have less than the channel value + anchor reserve available (if applicable). let init_features = self @@ -1249,8 +1230,7 @@ impl Node { /// /// [`EsploraSyncConfig::background_sync_config`]: crate::config::EsploraSyncConfig::background_sync_config pub fn sync_wallets(&self) -> Result<(), Error> { - let rt_lock = self.runtime.read().unwrap(); - if rt_lock.is_none() { + if !self.runtime.is_running() { return Err(Error::NotRunning); } @@ -1258,35 +1238,27 @@ impl Node { let sync_cman = Arc::clone(&self.channel_manager); let sync_cmon = Arc::clone(&self.chain_monitor); let sync_sweeper = Arc::clone(&self.output_sweeper); - tokio::task::block_in_place(move || { - tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap().block_on( - async move { - match chain_source.as_ref() { - ChainSource::Esplora { .. } => { - chain_source.update_fee_rate_estimates().await?; - chain_source - .sync_lightning_wallet(sync_cman, sync_cmon, sync_sweeper) - .await?; - chain_source.sync_onchain_wallet().await?; - }, - ChainSource::Electrum { .. } => { - chain_source.update_fee_rate_estimates().await?; - chain_source - .sync_lightning_wallet(sync_cman, sync_cmon, sync_sweeper) - .await?; - chain_source.sync_onchain_wallet().await?; - }, - ChainSource::BitcoindRpc { .. } => { - chain_source.update_fee_rate_estimates().await?; - chain_source - .poll_and_update_listeners(sync_cman, sync_cmon, sync_sweeper) - .await?; - }, - } - Ok(()) + self.runtime.block_on(async move { + match chain_source.as_ref() { + ChainSource::Esplora { .. } => { + chain_source.update_fee_rate_estimates().await?; + chain_source.sync_lightning_wallet(sync_cman, sync_cmon, sync_sweeper).await?; + chain_source.sync_onchain_wallet().await?; }, - ) - }) + ChainSource::Electrum { .. } => { + chain_source.update_fee_rate_estimates().await?; + chain_source.sync_lightning_wallet(sync_cman, sync_cmon, sync_sweeper).await?; + chain_source.sync_onchain_wallet().await?; + }, + ChainSource::BitcoindRpc { .. } => { + chain_source.update_fee_rate_estimates().await?; + chain_source + .poll_and_update_listeners(sync_cman, sync_cmon, sync_sweeper) + .await?; + }, + } + Ok(()) + })? } /// Close a previously opened channel. diff --git a/src/liquidity.rs b/src/liquidity.rs index 47f3dcce4..57cba64e1 100644 --- a/src/liquidity.rs +++ b/src/liquidity.rs @@ -10,6 +10,7 @@ use crate::chain::ChainSource; use crate::connection::ConnectionManager; use crate::logger::{log_debug, log_error, log_info, LdkLogger, Logger}; +use crate::runtime::Runtime; use crate::types::{ChannelManager, KeysManager, LiquidityManager, PeerManager, Wallet}; use crate::{total_anchor_channels_reserve_sats, Config, Error}; @@ -1388,7 +1389,7 @@ pub(crate) struct LSPS2BuyResponse { /// [`Bolt11Payment::receive_via_jit_channel`]: crate::payment::Bolt11Payment::receive_via_jit_channel #[derive(Clone)] pub struct LSPS1Liquidity { - runtime: Arc>>>, + runtime: Arc, wallet: Arc, connection_manager: Arc>>, liquidity_source: Option>>>, @@ -1397,7 +1398,7 @@ pub struct LSPS1Liquidity { impl LSPS1Liquidity { pub(crate) fn new( - runtime: Arc>>>, wallet: Arc, + runtime: Arc, wallet: Arc, connection_manager: Arc>>, liquidity_source: Option>>>, logger: Arc, ) -> Self { @@ -1418,39 +1419,32 @@ impl LSPS1Liquidity { let (lsp_node_id, lsp_address) = liquidity_source.get_lsps1_lsp_details().ok_or(Error::LiquiditySourceUnavailable)?; - let rt_lock = self.runtime.read().unwrap(); - let runtime = rt_lock.as_ref().unwrap(); - let con_node_id = lsp_node_id; let con_addr = lsp_address.clone(); let con_cm = Arc::clone(&self.connection_manager); // We need to use our main runtime here as a local runtime might not be around to poll // connection futures going forward. - tokio::task::block_in_place(move || { - runtime.block_on(async move { - con_cm.connect_peer_if_necessary(con_node_id, con_addr).await - }) - })?; + self.runtime.block_on(async move { + con_cm.connect_peer_if_necessary(con_node_id, con_addr).await + })??; log_info!(self.logger, "Connected to LSP {}@{}. ", lsp_node_id, lsp_address); let refund_address = self.wallet.get_new_address()?; let liquidity_source = Arc::clone(&liquidity_source); - let response = tokio::task::block_in_place(move || { - runtime.block_on(async move { - liquidity_source - .lsps1_request_channel( - lsp_balance_sat, - client_balance_sat, - channel_expiry_blocks, - announce_channel, - refund_address, - ) - .await - }) - })?; + let response = self.runtime.block_on(async move { + liquidity_source + .lsps1_request_channel( + lsp_balance_sat, + client_balance_sat, + channel_expiry_blocks, + announce_channel, + refund_address, + ) + .await + })??; Ok(response) } @@ -1463,27 +1457,20 @@ impl LSPS1Liquidity { let (lsp_node_id, lsp_address) = liquidity_source.get_lsps1_lsp_details().ok_or(Error::LiquiditySourceUnavailable)?; - let rt_lock = self.runtime.read().unwrap(); - let runtime = rt_lock.as_ref().unwrap(); - let con_node_id = lsp_node_id; let con_addr = lsp_address.clone(); let con_cm = Arc::clone(&self.connection_manager); // We need to use our main runtime here as a local runtime might not be around to poll // connection futures going forward. - tokio::task::block_in_place(move || { - runtime.block_on(async move { - con_cm.connect_peer_if_necessary(con_node_id, con_addr).await - }) - })?; + self.runtime.block_on(async move { + con_cm.connect_peer_if_necessary(con_node_id, con_addr).await + })??; let liquidity_source = Arc::clone(&liquidity_source); - let response = tokio::task::block_in_place(move || { - runtime - .block_on(async move { liquidity_source.lsps1_check_order_status(order_id).await }) - })?; - + let response = self + .runtime + .block_on(async move { liquidity_source.lsps1_check_order_status(order_id).await })??; Ok(response) } } diff --git a/src/payment/bolt11.rs b/src/payment/bolt11.rs index 052571818..3049d8726 100644 --- a/src/payment/bolt11.rs +++ b/src/payment/bolt11.rs @@ -21,6 +21,7 @@ use crate::payment::store::{ }; use crate::payment::SendingParameters; use crate::peer_store::{PeerInfo, PeerStore}; +use crate::runtime::Runtime; use crate::types::{ChannelManager, PaymentStore}; use lightning::ln::bolt11_payment; @@ -37,7 +38,7 @@ use lightning_invoice::Bolt11InvoiceDescription as LdkBolt11InvoiceDescription; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::hashes::Hash; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; #[cfg(not(feature = "uniffi"))] type Bolt11Invoice = LdkBolt11Invoice; @@ -87,7 +88,7 @@ macro_rules! maybe_convert_description { /// [BOLT 11]: https://github.com/lightning/bolts/blob/master/11-payment-encoding.md /// [`Node::bolt11_payment`]: crate::Node::bolt11_payment pub struct Bolt11Payment { - runtime: Arc>>>, + runtime: Arc, channel_manager: Arc, connection_manager: Arc>>, liquidity_source: Option>>>, @@ -99,8 +100,7 @@ pub struct Bolt11Payment { impl Bolt11Payment { pub(crate) fn new( - runtime: Arc>>>, - channel_manager: Arc, + runtime: Arc, channel_manager: Arc, connection_manager: Arc>>, liquidity_source: Option>>>, payment_store: Arc, peer_store: Arc>>, @@ -126,8 +126,7 @@ impl Bolt11Payment { &self, invoice: &Bolt11Invoice, sending_parameters: Option, ) -> Result { let invoice = maybe_convert_invoice(invoice); - let rt_lock = self.runtime.read().unwrap(); - if rt_lock.is_none() { + if !self.runtime.is_running() { return Err(Error::NotRunning); } @@ -235,8 +234,7 @@ impl Bolt11Payment { sending_parameters: Option, ) -> Result { let invoice = maybe_convert_invoice(invoice); - let rt_lock = self.runtime.read().unwrap(); - if rt_lock.is_none() { + if !self.runtime.is_running() { return Err(Error::NotRunning); } @@ -649,9 +647,6 @@ impl Bolt11Payment { let (node_id, address) = liquidity_source.get_lsps2_lsp_details().ok_or(Error::LiquiditySourceUnavailable)?; - let rt_lock = self.runtime.read().unwrap(); - let runtime = rt_lock.as_ref().unwrap(); - let peer_info = PeerInfo { node_id, address }; let con_node_id = peer_info.node_id; @@ -660,40 +655,36 @@ impl Bolt11Payment { // We need to use our main runtime here as a local runtime might not be around to poll // connection futures going forward. - tokio::task::block_in_place(move || { - runtime.block_on(async move { - con_cm.connect_peer_if_necessary(con_node_id, con_addr).await - }) - })?; + self.runtime.block_on(async move { + con_cm.connect_peer_if_necessary(con_node_id, con_addr).await + })??; log_info!(self.logger, "Connected to LSP {}@{}. ", peer_info.node_id, peer_info.address); let liquidity_source = Arc::clone(&liquidity_source); let (invoice, lsp_total_opening_fee, lsp_prop_opening_fee) = - tokio::task::block_in_place(move || { - runtime.block_on(async move { - if let Some(amount_msat) = amount_msat { - liquidity_source - .lsps2_receive_to_jit_channel( - amount_msat, - description, - expiry_secs, - max_total_lsp_fee_limit_msat, - ) - .await - .map(|(invoice, total_fee)| (invoice, Some(total_fee), None)) - } else { - liquidity_source - .lsps2_receive_variable_amount_to_jit_channel( - description, - expiry_secs, - max_proportional_lsp_fee_limit_ppm_msat, - ) - .await - .map(|(invoice, prop_fee)| (invoice, None, Some(prop_fee))) - } - }) - })?; + self.runtime.block_on(async move { + if let Some(amount_msat) = amount_msat { + liquidity_source + .lsps2_receive_to_jit_channel( + amount_msat, + description, + expiry_secs, + max_total_lsp_fee_limit_msat, + ) + .await + .map(|(invoice, total_fee)| (invoice, Some(total_fee), None)) + } else { + liquidity_source + .lsps2_receive_variable_amount_to_jit_channel( + description, + expiry_secs, + max_proportional_lsp_fee_limit_ppm_msat, + ) + .await + .map(|(invoice, prop_fee)| (invoice, None, Some(prop_fee))) + } + })??; // Register payment in payment store. let payment_hash = PaymentHash(invoice.payment_hash().to_byte_array()); @@ -742,12 +733,12 @@ impl Bolt11Payment { /// amount times [`Config::probing_liquidity_limit_multiplier`] won't be used to send /// pre-flight probes. pub fn send_probes(&self, invoice: &Bolt11Invoice) -> Result<(), Error> { - let invoice = maybe_convert_invoice(invoice); - let rt_lock = self.runtime.read().unwrap(); - if rt_lock.is_none() { + if !self.runtime.is_running() { return Err(Error::NotRunning); } + let invoice = maybe_convert_invoice(invoice); + let (_payment_hash, _recipient_onion, route_params) = bolt11_payment::payment_parameters_from_invoice(&invoice).map_err(|_| { log_error!(self.logger, "Failed to send probes due to the given invoice being \"zero-amount\". Please use send_probes_using_amount instead."); Error::InvalidInvoice @@ -775,12 +766,12 @@ impl Bolt11Payment { pub fn send_probes_using_amount( &self, invoice: &Bolt11Invoice, amount_msat: u64, ) -> Result<(), Error> { - let invoice = maybe_convert_invoice(invoice); - let rt_lock = self.runtime.read().unwrap(); - if rt_lock.is_none() { + if !self.runtime.is_running() { return Err(Error::NotRunning); } + let invoice = maybe_convert_invoice(invoice); + let (_payment_hash, _recipient_onion, route_params) = if let Some(invoice_amount_msat) = invoice.amount_milli_satoshis() { diff --git a/src/payment/bolt12.rs b/src/payment/bolt12.rs index 8006f4bb9..d2d2a9e52 100644 --- a/src/payment/bolt12.rs +++ b/src/payment/bolt12.rs @@ -13,6 +13,7 @@ use crate::config::LDK_PAYMENT_RETRY_TIMEOUT; use crate::error::Error; use crate::logger::{log_error, log_info, LdkLogger, Logger}; use crate::payment::store::{PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus}; +use crate::runtime::Runtime; use crate::types::{ChannelManager, PaymentStore}; use lightning::ln::channelmanager::{PaymentId, Retry}; @@ -25,7 +26,7 @@ use lightning::util::string::UntrustedString; use rand::RngCore; use std::num::NonZeroU64; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; /// A payment handler allowing to create and pay [BOLT 12] offers and refunds. @@ -35,7 +36,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; /// [BOLT 12]: https://github.com/lightning/bolts/blob/master/12-offer-encoding.md /// [`Node::bolt12_payment`]: crate::Node::bolt12_payment pub struct Bolt12Payment { - runtime: Arc>>>, + runtime: Arc, channel_manager: Arc, payment_store: Arc, logger: Arc, @@ -43,9 +44,8 @@ pub struct Bolt12Payment { impl Bolt12Payment { pub(crate) fn new( - runtime: Arc>>>, - channel_manager: Arc, payment_store: Arc, - logger: Arc, + runtime: Arc, channel_manager: Arc, + payment_store: Arc, logger: Arc, ) -> Self { Self { runtime, channel_manager, payment_store, logger } } @@ -59,10 +59,10 @@ impl Bolt12Payment { pub fn send( &self, offer: &Offer, quantity: Option, payer_note: Option, ) -> Result { - let rt_lock = self.runtime.read().unwrap(); - if rt_lock.is_none() { + if !self.runtime.is_running() { return Err(Error::NotRunning); } + let mut random_bytes = [0u8; 32]; rand::thread_rng().fill_bytes(&mut random_bytes); let payment_id = PaymentId(random_bytes); @@ -160,8 +160,7 @@ impl Bolt12Payment { pub fn send_using_amount( &self, offer: &Offer, amount_msat: u64, quantity: Option, payer_note: Option, ) -> Result { - let rt_lock = self.runtime.read().unwrap(); - if rt_lock.is_none() { + if !self.runtime.is_running() { return Err(Error::NotRunning); } diff --git a/src/payment/onchain.rs b/src/payment/onchain.rs index 046d66c69..e57606474 100644 --- a/src/payment/onchain.rs +++ b/src/payment/onchain.rs @@ -10,12 +10,13 @@ use crate::config::Config; use crate::error::Error; use crate::logger::{log_info, LdkLogger, Logger}; +use crate::runtime::Runtime; use crate::types::{ChannelManager, Wallet}; use crate::wallet::OnchainSendAmount; use bitcoin::{Address, Txid}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; #[cfg(not(feature = "uniffi"))] type FeeRate = bitcoin::FeeRate; @@ -41,7 +42,7 @@ macro_rules! maybe_map_fee_rate_opt { /// /// [`Node::onchain_payment`]: crate::Node::onchain_payment pub struct OnchainPayment { - runtime: Arc>>>, + runtime: Arc, wallet: Arc, channel_manager: Arc, config: Arc, @@ -50,8 +51,8 @@ pub struct OnchainPayment { impl OnchainPayment { pub(crate) fn new( - runtime: Arc>>>, wallet: Arc, - channel_manager: Arc, config: Arc, logger: Arc, + runtime: Arc, wallet: Arc, channel_manager: Arc, + config: Arc, logger: Arc, ) -> Self { Self { runtime, wallet, channel_manager, config, logger } } @@ -75,8 +76,7 @@ impl OnchainPayment { pub fn send_to_address( &self, address: &bitcoin::Address, amount_sats: u64, fee_rate: Option, ) -> Result { - let rt_lock = self.runtime.read().unwrap(); - if rt_lock.is_none() { + if !self.runtime.is_running() { return Err(Error::NotRunning); } @@ -106,8 +106,7 @@ impl OnchainPayment { pub fn send_all_to_address( &self, address: &bitcoin::Address, retain_reserves: bool, fee_rate: Option, ) -> Result { - let rt_lock = self.runtime.read().unwrap(); - if rt_lock.is_none() { + if !self.runtime.is_running() { return Err(Error::NotRunning); } diff --git a/src/payment/spontaneous.rs b/src/payment/spontaneous.rs index 1508b6cd8..ec84fcace 100644 --- a/src/payment/spontaneous.rs +++ b/src/payment/spontaneous.rs @@ -12,6 +12,7 @@ use crate::error::Error; use crate::logger::{log_error, log_info, LdkLogger, Logger}; use crate::payment::store::{PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus}; use crate::payment::SendingParameters; +use crate::runtime::Runtime; use crate::types::{ChannelManager, CustomTlvRecord, KeysManager, PaymentStore}; use lightning::ln::channelmanager::{PaymentId, RecipientOnionFields, Retry, RetryableSendFailure}; @@ -22,7 +23,7 @@ use lightning_types::payment::{PaymentHash, PaymentPreimage}; use bitcoin::secp256k1::PublicKey; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; // The default `final_cltv_expiry_delta` we apply when not set. const LDK_DEFAULT_FINAL_CLTV_EXPIRY_DELTA: u32 = 144; @@ -33,7 +34,7 @@ const LDK_DEFAULT_FINAL_CLTV_EXPIRY_DELTA: u32 = 144; /// /// [`Node::spontaneous_payment`]: crate::Node::spontaneous_payment pub struct SpontaneousPayment { - runtime: Arc>>>, + runtime: Arc, channel_manager: Arc, keys_manager: Arc, payment_store: Arc, @@ -43,9 +44,9 @@ pub struct SpontaneousPayment { impl SpontaneousPayment { pub(crate) fn new( - runtime: Arc>>>, - channel_manager: Arc, keys_manager: Arc, - payment_store: Arc, config: Arc, logger: Arc, + runtime: Arc, channel_manager: Arc, + keys_manager: Arc, payment_store: Arc, config: Arc, + logger: Arc, ) -> Self { Self { runtime, channel_manager, keys_manager, payment_store, config, logger } } @@ -72,8 +73,7 @@ impl SpontaneousPayment { &self, amount_msat: u64, node_id: PublicKey, sending_parameters: Option, custom_tlvs: Option>, ) -> Result { - let rt_lock = self.runtime.read().unwrap(); - if rt_lock.is_none() { + if !self.runtime.is_running() { return Err(Error::NotRunning); } @@ -180,8 +180,7 @@ impl SpontaneousPayment { /// /// [`Bolt11Payment::send_probes`]: crate::payment::Bolt11Payment pub fn send_probes(&self, amount_msat: u64, node_id: PublicKey) -> Result<(), Error> { - let rt_lock = self.runtime.read().unwrap(); - if rt_lock.is_none() { + if !self.runtime.is_running() { return Err(Error::NotRunning); } diff --git a/src/runtime.rs b/src/runtime.rs new file mode 100644 index 000000000..dd6e2c122 --- /dev/null +++ b/src/runtime.rs @@ -0,0 +1,161 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +#[cfg(tokio_unstable)] +use crate::logger::log_trace; +use crate::logger::{log_error, LdkLogger, Logger}; + +use tokio::task::JoinHandle; + +use std::fmt; +use std::future::Future; +use std::sync::{Arc, RwLock}; + +pub(crate) struct Runtime { + state: RwLock, + logger: Arc, +} + +impl Runtime { + pub fn new(logger: Arc) -> Self { + let state = RwLock::new(RuntimeState::Stopped); + Self { state, logger } + } + + pub fn start(&self) -> Result<(), RuntimeError> { + let mut state_lock = self.state.write().unwrap(); + if !matches!(*state_lock, RuntimeState::Stopped) { + return Err(RuntimeError::AlreadyRunning); + } + match tokio::runtime::Handle::try_current() { + Ok(handle) => *state_lock = RuntimeState::Handle(handle), + Err(_) => { + let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build().map_err( + |e| { + log_error!(self.logger, "Failed to setup tokio runtime: {}", e); + RuntimeError::SetupFailed + }, + )?; + *state_lock = RuntimeState::Owned(rt) + }, + } + Ok(()) + } + + pub fn start_from_handle(&self, handle: tokio::runtime::Handle) -> Result<(), RuntimeError> { + let mut state_lock = self.state.write().unwrap(); + if !matches!(*state_lock, RuntimeState::Stopped) { + return Err(RuntimeError::AlreadyRunning); + } + *state_lock = RuntimeState::Handle(handle); + Ok(()) + } + + pub fn stop(&self) -> Result<(), RuntimeError> { + let mut state_lock = self.state.write().unwrap(); + if matches!(*state_lock, RuntimeState::Stopped) { + return Err(RuntimeError::NotRunning); + } + + let old_state = core::mem::replace(&mut *state_lock, RuntimeState::Stopped); + match old_state { + RuntimeState::Owned(rt) => { + #[cfg(tokio_unstable)] + log_trace!( + self.logger, + "Active runtime tasks left prior to shutdown: {}", + rt.metrics().active_tasks_count() + ); + rt.shutdown_background(); + }, + RuntimeState::Handle(_handle) => { + #[cfg(tokio_unstable)] + log_trace!( + self.logger, + "Active runtime tasks left prior to shutdown: {}", + _handle.metrics().active_tasks_count() + ); + }, + RuntimeState::Stopped => return Err(RuntimeError::NotRunning), + } + + Ok(()) + } + + pub fn is_running(&self) -> bool { + !matches!(*self.state.read().unwrap(), RuntimeState::Stopped) + } + + pub fn spawn(&self, future: F) -> Result, RuntimeError> + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let handle = self.handle()?; + Ok(handle.spawn(future)) + } + + pub fn spawn_blocking(&self, func: F) -> Result, RuntimeError> + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let handle = self.handle()?; + Ok(handle.spawn_blocking(func)) + } + + pub fn block_on(&self, future: F) -> Result { + let handle = self.handle()?; + Ok(tokio::task::block_in_place(move || handle.block_on(future))) + } + + fn handle(&self) -> Result { + match &*self.state.read().unwrap() { + RuntimeState::Owned(rt) => Ok(rt.handle().clone()), + RuntimeState::Handle(handle) => Ok(handle.clone()), + RuntimeState::Stopped => Err(RuntimeError::NotRunning), + } + } +} + +impl Drop for Runtime { + fn drop(&mut self) { + let _ = self.stop(); + } +} + +enum RuntimeState { + Owned(tokio::runtime::Runtime), + Handle(tokio::runtime::Handle), + Stopped, +} + +#[derive(Debug)] +pub(crate) enum RuntimeError { + SetupFailed, + AlreadyRunning, + NotRunning, +} + +impl std::error::Error for RuntimeError {} + +impl fmt::Display for RuntimeError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Self::AlreadyRunning => write!(f, "Runtime already running."), + Self::NotRunning => write!(f, "Runtime is not running."), + Self::SetupFailed => write!(f, "Failed to setup runtime."), + } + } +} + +impl From for std::io::Error { + fn from(runtime_error: RuntimeError) -> Self { + let msg = format!("Runtime error: {}", runtime_error); + std::io::Error::new(std::io::ErrorKind::Other, msg) + } +} diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index ded88d35c..3ab528ef5 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -1287,3 +1287,15 @@ fn facade_logging() { validate_log_entry(entry); } } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_start_stop_drop_in_runtime_context() { + let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Esplora(&electrsd); + + { + let config = random_config(true); + let node = setup_node(&chain_source, config, None); + node.stop().unwrap(); + } +} From 6476ae1b4c6d4bec8d5181555c552a0164937cee Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 23 May 2025 09:53:52 +0200 Subject: [PATCH 03/10] f Add `debug_assert` in Electrum broadcast method --- src/chain/electrum.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/chain/electrum.rs b/src/chain/electrum.rs index eece892a6..e8b0970d2 100644 --- a/src/chain/electrum.rs +++ b/src/chain/electrum.rs @@ -190,6 +190,10 @@ impl ElectrumRuntimeClient { { spawn_fut } else { + debug_assert!( + false, + "Failed to broadcast due to runtime being unavailable. This should never happen." + ); return; }; From 8bb51ad1129f2dd4b3ddb0eeed696bd7722a4d0a Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 23 May 2025 12:49:25 +0200 Subject: [PATCH 04/10] f Switch to runtime building on initialization --- bindings/ldk_node.udl | 2 +- src/builder.rs | 42 ++++++++++- src/chain/electrum.rs | 22 ++---- src/error.rs | 15 ---- src/event.rs | 12 +-- src/gossip.rs | 21 ++---- src/lib.rs | 92 +++++++++++------------ src/liquidity.rs | 8 +- src/payment/bolt11.rs | 18 +++-- src/payment/bolt12.rs | 15 ++-- src/payment/onchain.rs | 15 ++-- src/payment/spontaneous.rs | 15 ++-- src/runtime.rs | 149 ++++++++----------------------------- 13 files changed, 168 insertions(+), 258 deletions(-) diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index a85c2eca7..9ee66d2f1 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -252,7 +252,6 @@ interface LSPS1Liquidity { enum NodeError { "AlreadyRunning", "NotRunning", - "RuntimeSetupFailed", "OnchainTxCreationFailed", "ConnectionFailed", "InvoiceCreationFailed", @@ -331,6 +330,7 @@ enum BuildError { "InvalidListeningAddresses", "InvalidAnnouncementAddresses", "InvalidNodeAlias", + "RuntimeSetupFailed", "ReadFailed", "WriteFailed", "StoragePathAccessFailed", diff --git a/src/builder.rs b/src/builder.rs index 77f643550..cd2bf76d5 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -155,6 +155,8 @@ pub enum BuildError { InvalidAnnouncementAddresses, /// The provided alias is invalid. InvalidNodeAlias, + /// An attempt to setup a runtime has failed. + RuntimeSetupFailed, /// We failed to read data from the [`KVStore`]. /// /// [`KVStore`]: lightning::util::persist::KVStore @@ -192,6 +194,7 @@ impl fmt::Display for BuildError { Self::InvalidAnnouncementAddresses => { write!(f, "Given announcement addresses are invalid.") }, + Self::RuntimeSetupFailed => write!(f, "Failed to setup a runtime."), Self::ReadFailed => write!(f, "Failed to read from store."), Self::WriteFailed => write!(f, "Failed to write to store."), Self::StoragePathAccessFailed => write!(f, "Failed to access the given storage path."), @@ -223,6 +226,7 @@ pub struct NodeBuilder { gossip_source_config: Option, liquidity_source_config: Option, log_writer_config: Option, + runtime_handle: Option, } impl NodeBuilder { @@ -239,6 +243,7 @@ impl NodeBuilder { let gossip_source_config = None; let liquidity_source_config = None; let log_writer_config = None; + let runtime_handle = None; Self { config, entropy_source_config, @@ -246,9 +251,19 @@ impl NodeBuilder { gossip_source_config, liquidity_source_config, log_writer_config, + runtime_handle, } } + /// Configures the [`Node`] instance to (re-)use a specific `tokio` runtime. + /// + /// If not provided, the node will spawn its own runtime or reuse any outer runtime context it + /// can detect. + pub fn set_runtime(&mut self, runtime_handle: tokio::runtime::Handle) -> &mut Self { + self.runtime_handle = Some(runtime_handle); + self + } + /// Configures the [`Node`] instance to source its wallet entropy from a seed file on disk. /// /// If the given file does not exist a new random seed file will be generated and @@ -583,6 +598,15 @@ impl NodeBuilder { ) -> Result { let logger = setup_logger(&self.log_writer_config, &self.config)?; + let runtime = if let Some(handle) = self.runtime_handle.as_ref() { + Arc::new(Runtime::with_handle(handle.clone())) + } else { + Arc::new(Runtime::new().map_err(|e| { + log_error!(logger, "Failed to setup tokio runtime: {}", e); + BuildError::RuntimeSetupFailed + })?) + }; + let seed_bytes = seed_bytes_from_config( &self.config, self.entropy_source_config.as_ref(), @@ -611,6 +635,7 @@ impl NodeBuilder { self.gossip_source_config.as_ref(), self.liquidity_source_config.as_ref(), seed_bytes, + runtime, logger, Arc::new(vss_store), ) @@ -620,6 +645,15 @@ impl NodeBuilder { pub fn build_with_store(&self, kv_store: Arc) -> Result { let logger = setup_logger(&self.log_writer_config, &self.config)?; + let runtime = if let Some(handle) = self.runtime_handle.as_ref() { + Arc::new(Runtime::with_handle(handle.clone())) + } else { + Arc::new(Runtime::new().map_err(|e| { + log_error!(logger, "Failed to setup tokio runtime: {}", e); + BuildError::RuntimeSetupFailed + })?) + }; + let seed_bytes = seed_bytes_from_config( &self.config, self.entropy_source_config.as_ref(), @@ -633,6 +667,7 @@ impl NodeBuilder { self.gossip_source_config.as_ref(), self.liquidity_source_config.as_ref(), seed_bytes, + runtime, logger, kv_store, ) @@ -935,7 +970,7 @@ fn build_with_store_internal( config: Arc, chain_data_source_config: Option<&ChainDataSourceConfig>, gossip_source_config: Option<&GossipSourceConfig>, liquidity_source_config: Option<&LiquiditySourceConfig>, seed_bytes: [u8; 64], - logger: Arc, kv_store: Arc, + runtime: Arc, logger: Arc, kv_store: Arc, ) -> Result { if let Err(err) = may_announce_channel(&config) { if config.announcement_addresses.is_some() { @@ -1102,8 +1137,6 @@ fn build_with_store_internal( }, }; - let runtime = Arc::new(Runtime::new(Arc::clone(&logger))); - // Initialize the ChainMonitor let chain_monitor: Arc = Arc::new(chainmonitor::ChainMonitor::new( Some(Arc::clone(&chain_source)), @@ -1496,6 +1529,8 @@ fn build_with_store_internal( let (stop_sender, _) = tokio::sync::watch::channel(()); let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(()); + let is_running = Arc::new(RwLock::new(false)); + Ok(Node { runtime, stop_sender, @@ -1521,6 +1556,7 @@ fn build_with_store_internal( scorer, peer_store, payment_store, + is_running, is_listening, node_metrics, }) diff --git a/src/chain/electrum.rs b/src/chain/electrum.rs index e8b0970d2..dd6a49296 100644 --- a/src/chain/electrum.rs +++ b/src/chain/electrum.rs @@ -88,7 +88,7 @@ impl ElectrumRuntimeClient { let now = Instant::now(); let tx_sync = Arc::clone(&self.tx_sync); - let spawn_fut = self.runtime.spawn_blocking(move || tx_sync.sync(confirmables))?; + let spawn_fut = self.runtime.spawn_blocking(move || tx_sync.sync(confirmables)); let timeout_fut = tokio::time::timeout(Duration::from_secs(LDK_WALLET_SYNC_TIMEOUT_SECS), spawn_fut); @@ -130,7 +130,7 @@ impl ElectrumRuntimeClient { BDK_ELECTRUM_CLIENT_BATCH_SIZE, true, ) - })?; + }); let wallet_sync_timeout_fut = tokio::time::timeout(Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), spawn_fut); @@ -159,7 +159,7 @@ impl ElectrumRuntimeClient { let spawn_fut = self.runtime.spawn_blocking(move || { bdk_electrum_client.sync(request, BDK_ELECTRUM_CLIENT_BATCH_SIZE, true) - })?; + }); let wallet_sync_timeout_fut = tokio::time::timeout(Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), spawn_fut); @@ -185,18 +185,8 @@ impl ElectrumRuntimeClient { let txid = tx.compute_txid(); let tx_bytes = tx.encode(); - let spawn_fut = if let Ok(spawn_fut) = - self.runtime.spawn_blocking(move || electrum_client.transaction_broadcast(&tx)) - { - spawn_fut - } else { - debug_assert!( - false, - "Failed to broadcast due to runtime being unavailable. This should never happen." - ); - return; - }; - + let spawn_fut = + self.runtime.spawn_blocking(move || electrum_client.transaction_broadcast(&tx)); let timeout_fut = tokio::time::timeout(Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS), spawn_fut); @@ -242,7 +232,7 @@ impl ElectrumRuntimeClient { batch.estimate_fee(num_blocks); } - let spawn_fut = self.runtime.spawn_blocking(move || electrum_client.batch_call(&batch))?; + let spawn_fut = self.runtime.spawn_blocking(move || electrum_client.batch_call(&batch)); let timeout_fut = tokio::time::timeout( Duration::from_secs(FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS), diff --git a/src/error.rs b/src/error.rs index 4f7a7bde3..2cb71186d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -5,8 +5,6 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -use crate::runtime::RuntimeError; - use bdk_chain::bitcoin::psbt::ExtractTxError as BdkExtractTxError; use bdk_chain::local_chain::CannotConnectError as BdkChainConnectionError; use bdk_chain::tx_graph::CalculateFeeError as BdkChainCalculateFeeError; @@ -22,8 +20,6 @@ pub enum Error { AlreadyRunning, /// Returned when trying to stop [`crate::Node`] while it is not running. NotRunning, - /// An attempt to setup a runtime has failed. - RuntimeSetupFailed, /// An on-chain transaction could not be created. OnchainTxCreationFailed, /// A network connection has been closed. @@ -131,7 +127,6 @@ impl fmt::Display for Error { match *self { Self::AlreadyRunning => write!(f, "Node is already running."), Self::NotRunning => write!(f, "Node is not running."), - Self::RuntimeSetupFailed => write!(f, "Failed to setup a runtime."), Self::OnchainTxCreationFailed => { write!(f, "On-chain transaction could not be created.") }, @@ -204,16 +199,6 @@ impl fmt::Display for Error { impl std::error::Error for Error {} -impl From for Error { - fn from(runtime_error: RuntimeError) -> Self { - match runtime_error { - RuntimeError::SetupFailed => Self::RuntimeSetupFailed, - RuntimeError::AlreadyRunning => Self::AlreadyRunning, - RuntimeError::NotRunning => Self::NotRunning, - } - } -} - impl From for Error { fn from(_: BdkSignerError) -> Self { Self::OnchainTxSigningFailed diff --git a/src/event.rs b/src/event.rs index 2b372ed8c..21770d91f 100644 --- a/src/event.rs +++ b/src/event.rs @@ -29,7 +29,7 @@ use crate::io::{ }; use crate::logger::{log_debug, log_error, log_info, LdkLogger}; -use crate::runtime::{Runtime, RuntimeError}; +use crate::runtime::Runtime; use lightning::events::bump_transaction::BumpTransactionEvent; use lightning::events::{ClosureReason, PaymentPurpose, ReplayEvent}; @@ -1058,10 +1058,7 @@ where forwarding_channel_manager.process_pending_htlc_forwards(); }; - if let Err(RuntimeError::NotRunning) = self.runtime.spawn(future) { - log_error!(self.logger, "Tried spawing a future while the runtime wasn't available. This should never happen."); - debug_assert!(false, "Tried spawing a future while the runtime wasn't available. This should never happen."); - } + self.runtime.spawn(future); }, LdkEvent::SpendableOutputs { outputs, channel_id } => { match self.output_sweeper.track_spendable_outputs(outputs, channel_id, true, None) { @@ -1441,10 +1438,7 @@ where } } }; - if let Err(RuntimeError::NotRunning) = self.runtime.spawn(future) { - log_error!(self.logger, "Tried spawing a future while the runtime wasn't available. This should never happen."); - debug_assert!(false, "Tried spawing a future while the runtime wasn't available. This should never happen."); - } + self.runtime.spawn(future); }, LdkEvent::BumpTransaction(bte) => { match bte { diff --git a/src/gossip.rs b/src/gossip.rs index 18166fd3f..1185f0718 100644 --- a/src/gossip.rs +++ b/src/gossip.rs @@ -7,8 +7,8 @@ use crate::chain::ChainSource; use crate::config::RGS_SYNC_TIMEOUT_SECS; -use crate::logger::{log_error, log_trace, LdkLogger, Logger}; -use crate::runtime::{Runtime, RuntimeError}; +use crate::logger::{log_trace, LdkLogger, Logger}; +use crate::runtime::Runtime; use crate::types::{GossipSync, Graph, P2PGossipSync, PeerManager, RapidGossipSync, UtxoLookup}; use crate::Error; @@ -22,7 +22,6 @@ use std::time::Duration; pub(crate) enum GossipSource { P2PNetwork { gossip_sync: Arc, - logger: Arc, }, RapidGossipSync { gossip_sync: Arc, @@ -39,7 +38,7 @@ impl GossipSource { None::>, Arc::clone(&logger), )); - Self::P2PNetwork { gossip_sync, logger } + Self::P2PNetwork { gossip_sync } } pub fn new_rgs( @@ -67,9 +66,9 @@ impl GossipSource { runtime: Arc, ) { match self { - Self::P2PNetwork { gossip_sync, logger } => { + Self::P2PNetwork { gossip_sync } => { if let Some(utxo_source) = chain_source.as_utxo_source() { - let spawner = RuntimeSpawner::new(Arc::clone(&runtime), Arc::clone(&logger)); + let spawner = RuntimeSpawner::new(Arc::clone(&runtime)); let gossip_verifier = Arc::new(GossipVerifier::new( utxo_source, spawner, @@ -135,20 +134,16 @@ impl GossipSource { pub(crate) struct RuntimeSpawner { runtime: Arc, - logger: Arc, } impl RuntimeSpawner { - pub(crate) fn new(runtime: Arc, logger: Arc) -> Self { - Self { runtime, logger } + pub(crate) fn new(runtime: Arc) -> Self { + Self { runtime } } } impl FutureSpawner for RuntimeSpawner { fn spawn + Send + 'static>(&self, future: T) { - if let Err(RuntimeError::NotRunning) = self.runtime.spawn(future) { - log_error!(self.logger, "Tried spawing a future while the runtime wasn't available. This should never happen."); - debug_assert!(false, "Tried spawing a future while the runtime wasn't available. This should never happen."); - } + self.runtime.spawn(future); } } diff --git a/src/lib.rs b/src/lib.rs index 4032dbdc0..f9f33e66f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -// This file is Copyright its original authors, visible in version control history. +// This file is Copyright its original authors, visible in version control history.lib // // This file is licensed under the Apache License, Version 2.0 or the MIT license >, peer_store: Arc>>, payment_store: Arc, + is_running: Arc>, is_listening: Arc, node_metrics: Arc>, } @@ -216,26 +217,12 @@ impl Node { /// After this returns, the [`Node`] instance can be controlled via the provided API methods in /// a thread-safe manner. pub fn start(&self) -> Result<(), Error> { - self.runtime.start()?; - self.start_inner() - } - - /// Starts the necessary background tasks (such as handling events coming from user input, - /// LDK/BDK, and the peer-to-peer network) on the the given `runtime`. - /// - /// This allows to have LDK Node to specify an outer pre-existing runtime, e.g., to avoid - /// stacking Tokio runtime contexts. Note we require the runtime to be of the `multithreaded` - /// flavor. - /// - /// After this returns, the [`Node`] instance can be controlled via the provided API methods in - /// a thread-safe manner. - pub fn start_with_runtime(&self, handle: tokio::runtime::Handle) -> Result<(), Error> { - self.runtime.start_from_handle(handle)?; - self.start_inner() - } - - fn start_inner(&self) -> Result<(), Error> { // Acquire a run lock and hold it until we're setup. + let mut is_running_lock = self.is_running.write().unwrap(); + if *is_running_lock { + return Err(Error::AlreadyRunning); + } + log_info!( self.logger, "Starting up LDK Node with node ID {} on network: {}", @@ -251,7 +238,7 @@ impl Node { // Block to ensure we update our fee rate cache once on startup let chain_source = Arc::clone(&self.chain_source); - self.runtime.block_on(async move { chain_source.update_fee_rate_estimates().await })??; + self.runtime.block_on(async move { chain_source.update_fee_rate_estimates().await })?; // Spawn background task continuously syncing onchain, lightning, and fee rate cache. let stop_sync_receiver = self.stop_sender.subscribe(); @@ -263,7 +250,7 @@ impl Node { chain_source .continuously_sync_wallets(stop_sync_receiver, sync_cman, sync_cmon, sync_sweeper) .await; - })?; + }); if self.gossip_source.is_rgs() { let gossip_source = Arc::clone(&self.gossip_source); @@ -312,7 +299,7 @@ impl Node { } } } - })?; + }); } if let Some(listening_addresses) = &self.config.listening_addresses { @@ -376,7 +363,7 @@ impl Node { } listening_indicator.store(false, Ordering::Release); - })?; + }); } // Regularly reconnect to persisted peers. @@ -413,7 +400,7 @@ impl Node { } } } - })?; + }); // Regularly broadcast node announcements. let bcast_cm = Arc::clone(&self.channel_manager); @@ -496,7 +483,7 @@ impl Node { } } } - })?; + }); } let mut stop_tx_bcast = self.stop_sender.subscribe(); @@ -520,7 +507,7 @@ impl Node { } } } - })?; + }); let bump_tx_event_handler = Arc::new(BumpTransactionEventHandler::new( Arc::clone(&self.tx_broadcaster), @@ -612,7 +599,7 @@ impl Node { debug_assert!(false); }, } - })?; + }); if let Some(liquidity_source) = self.liquidity_source.as_ref() { let mut stop_liquidity_handler = self.stop_sender.subscribe(); @@ -631,10 +618,11 @@ impl Node { _ = liquidity_handler.handle_next_event() => {} } } - })?; + }); } log_info!(self.logger, "Startup complete."); + *is_running_lock = true; Ok(()) } @@ -642,7 +630,8 @@ impl Node { /// /// After this returns most API methods will return [`Error::NotRunning`]. pub fn stop(&self) -> Result<(), Error> { - if !self.runtime.is_running() { + let mut is_running_lock = self.is_running.write().unwrap(); + if !*is_running_lock { return Err(Error::NotRunning); } @@ -680,7 +669,7 @@ impl Node { event_handling_stopped_receiver.changed(), ) .await - })?; + }); match timeout_res { Ok(stop_res) => match stop_res { @@ -703,15 +692,24 @@ impl Node { }, } - self.runtime.stop()?; + #[cfg(tokio_unstable)] + { + let runtime_handle = self.runtime.handle(); + log_trace!( + self.logger, + "Active runtime tasks left prior to shutdown: {}", + runtime_handle.metrics().active_tasks_count() + ); + } log_info!(self.logger, "Shutdown complete."); + *is_running_lock = false; Ok(()) } /// Returns the status of the [`Node`]. pub fn status(&self) -> NodeStatus { - let is_running = self.runtime.is_running(); + let is_running = *self.is_running.read().unwrap(); let is_listening = self.is_listening.load(Ordering::Acquire); let current_best_block = self.channel_manager.current_best_block().into(); let locked_node_metrics = self.node_metrics.read().unwrap(); @@ -832,6 +830,7 @@ impl Node { Arc::clone(&self.payment_store), Arc::clone(&self.peer_store), Arc::clone(&self.config), + Arc::clone(&self.is_running), Arc::clone(&self.logger), ) } @@ -849,6 +848,7 @@ impl Node { Arc::clone(&self.payment_store), Arc::clone(&self.peer_store), Arc::clone(&self.config), + Arc::clone(&self.is_running), Arc::clone(&self.logger), )) } @@ -859,9 +859,9 @@ impl Node { #[cfg(not(feature = "uniffi"))] pub fn bolt12_payment(&self) -> Bolt12Payment { Bolt12Payment::new( - Arc::clone(&self.runtime), Arc::clone(&self.channel_manager), Arc::clone(&self.payment_store), + Arc::clone(&self.is_running), Arc::clone(&self.logger), ) } @@ -872,9 +872,9 @@ impl Node { #[cfg(feature = "uniffi")] pub fn bolt12_payment(&self) -> Arc { Arc::new(Bolt12Payment::new( - Arc::clone(&self.runtime), Arc::clone(&self.channel_manager), Arc::clone(&self.payment_store), + Arc::clone(&self.is_running), Arc::clone(&self.logger), )) } @@ -883,11 +883,11 @@ impl Node { #[cfg(not(feature = "uniffi"))] pub fn spontaneous_payment(&self) -> SpontaneousPayment { SpontaneousPayment::new( - Arc::clone(&self.runtime), Arc::clone(&self.channel_manager), Arc::clone(&self.keys_manager), Arc::clone(&self.payment_store), Arc::clone(&self.config), + Arc::clone(&self.is_running), Arc::clone(&self.logger), ) } @@ -896,11 +896,11 @@ impl Node { #[cfg(feature = "uniffi")] pub fn spontaneous_payment(&self) -> Arc { Arc::new(SpontaneousPayment::new( - Arc::clone(&self.runtime), Arc::clone(&self.channel_manager), Arc::clone(&self.keys_manager), Arc::clone(&self.payment_store), Arc::clone(&self.config), + Arc::clone(&self.is_running), Arc::clone(&self.logger), )) } @@ -909,10 +909,10 @@ impl Node { #[cfg(not(feature = "uniffi"))] pub fn onchain_payment(&self) -> OnchainPayment { OnchainPayment::new( - Arc::clone(&self.runtime), Arc::clone(&self.wallet), Arc::clone(&self.channel_manager), Arc::clone(&self.config), + Arc::clone(&self.is_running), Arc::clone(&self.logger), ) } @@ -921,10 +921,10 @@ impl Node { #[cfg(feature = "uniffi")] pub fn onchain_payment(&self) -> Arc { Arc::new(OnchainPayment::new( - Arc::clone(&self.runtime), Arc::clone(&self.wallet), Arc::clone(&self.channel_manager), Arc::clone(&self.config), + Arc::clone(&self.is_running), Arc::clone(&self.logger), )) } @@ -1002,7 +1002,7 @@ impl Node { pub fn connect( &self, node_id: PublicKey, address: SocketAddress, persist: bool, ) -> Result<(), Error> { - if !self.runtime.is_running() { + if !*self.is_running.read().unwrap() { return Err(Error::NotRunning); } @@ -1016,7 +1016,7 @@ impl Node { // connection futures going forward. self.runtime.block_on(async move { con_cm.connect_peer_if_necessary(con_node_id, con_addr).await - })??; + })?; log_info!(self.logger, "Connected to peer {}@{}. ", peer_info.node_id, peer_info.address); @@ -1032,7 +1032,7 @@ impl Node { /// Will also remove the peer from the peer store, i.e., after this has been called we won't /// try to reconnect on restart. pub fn disconnect(&self, counterparty_node_id: PublicKey) -> Result<(), Error> { - if !self.runtime.is_running() { + if !*self.is_running.read().unwrap() { return Err(Error::NotRunning); } @@ -1054,7 +1054,7 @@ impl Node { push_to_counterparty_msat: Option, channel_config: Option, announce_for_forwarding: bool, ) -> Result { - if !self.runtime.is_running() { + if !*self.is_running.read().unwrap() { return Err(Error::NotRunning); } @@ -1082,7 +1082,7 @@ impl Node { // connection futures going forward. self.runtime.block_on(async move { con_cm.connect_peer_if_necessary(con_node_id, con_addr).await - })??; + })?; // Fail if we have less than the channel value + anchor reserve available (if applicable). let init_features = self @@ -1230,7 +1230,7 @@ impl Node { /// /// [`EsploraSyncConfig::background_sync_config`]: crate::config::EsploraSyncConfig::background_sync_config pub fn sync_wallets(&self) -> Result<(), Error> { - if !self.runtime.is_running() { + if !*self.is_running.read().unwrap() { return Err(Error::NotRunning); } @@ -1258,7 +1258,7 @@ impl Node { }, } Ok(()) - })? + }) } /// Close a previously opened channel. diff --git a/src/liquidity.rs b/src/liquidity.rs index 57cba64e1..e93f33abf 100644 --- a/src/liquidity.rs +++ b/src/liquidity.rs @@ -1427,7 +1427,7 @@ impl LSPS1Liquidity { // connection futures going forward. self.runtime.block_on(async move { con_cm.connect_peer_if_necessary(con_node_id, con_addr).await - })??; + })?; log_info!(self.logger, "Connected to LSP {}@{}. ", lsp_node_id, lsp_address); @@ -1444,7 +1444,7 @@ impl LSPS1Liquidity { refund_address, ) .await - })??; + })?; Ok(response) } @@ -1465,12 +1465,12 @@ impl LSPS1Liquidity { // connection futures going forward. self.runtime.block_on(async move { con_cm.connect_peer_if_necessary(con_node_id, con_addr).await - })??; + })?; let liquidity_source = Arc::clone(&liquidity_source); let response = self .runtime - .block_on(async move { liquidity_source.lsps1_check_order_status(order_id).await })??; + .block_on(async move { liquidity_source.lsps1_check_order_status(order_id).await })?; Ok(response) } } diff --git a/src/payment/bolt11.rs b/src/payment/bolt11.rs index 3049d8726..e526fc055 100644 --- a/src/payment/bolt11.rs +++ b/src/payment/bolt11.rs @@ -38,7 +38,7 @@ use lightning_invoice::Bolt11InvoiceDescription as LdkBolt11InvoiceDescription; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::hashes::Hash; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; #[cfg(not(feature = "uniffi"))] type Bolt11Invoice = LdkBolt11Invoice; @@ -95,6 +95,7 @@ pub struct Bolt11Payment { payment_store: Arc, peer_store: Arc>>, config: Arc, + is_running: Arc>, logger: Arc, } @@ -104,7 +105,7 @@ impl Bolt11Payment { connection_manager: Arc>>, liquidity_source: Option>>>, payment_store: Arc, peer_store: Arc>>, - config: Arc, logger: Arc, + config: Arc, is_running: Arc>, logger: Arc, ) -> Self { Self { runtime, @@ -114,6 +115,7 @@ impl Bolt11Payment { payment_store, peer_store, config, + is_running, logger, } } @@ -126,7 +128,7 @@ impl Bolt11Payment { &self, invoice: &Bolt11Invoice, sending_parameters: Option, ) -> Result { let invoice = maybe_convert_invoice(invoice); - if !self.runtime.is_running() { + if !*self.is_running.read().unwrap() { return Err(Error::NotRunning); } @@ -234,7 +236,7 @@ impl Bolt11Payment { sending_parameters: Option, ) -> Result { let invoice = maybe_convert_invoice(invoice); - if !self.runtime.is_running() { + if !*self.is_running.read().unwrap() { return Err(Error::NotRunning); } @@ -657,7 +659,7 @@ impl Bolt11Payment { // connection futures going forward. self.runtime.block_on(async move { con_cm.connect_peer_if_necessary(con_node_id, con_addr).await - })??; + })?; log_info!(self.logger, "Connected to LSP {}@{}. ", peer_info.node_id, peer_info.address); @@ -684,7 +686,7 @@ impl Bolt11Payment { .await .map(|(invoice, prop_fee)| (invoice, None, Some(prop_fee))) } - })??; + })?; // Register payment in payment store. let payment_hash = PaymentHash(invoice.payment_hash().to_byte_array()); @@ -733,7 +735,7 @@ impl Bolt11Payment { /// amount times [`Config::probing_liquidity_limit_multiplier`] won't be used to send /// pre-flight probes. pub fn send_probes(&self, invoice: &Bolt11Invoice) -> Result<(), Error> { - if !self.runtime.is_running() { + if !*self.is_running.read().unwrap() { return Err(Error::NotRunning); } @@ -766,7 +768,7 @@ impl Bolt11Payment { pub fn send_probes_using_amount( &self, invoice: &Bolt11Invoice, amount_msat: u64, ) -> Result<(), Error> { - if !self.runtime.is_running() { + if !*self.is_running.read().unwrap() { return Err(Error::NotRunning); } diff --git a/src/payment/bolt12.rs b/src/payment/bolt12.rs index d2d2a9e52..8024d3f7c 100644 --- a/src/payment/bolt12.rs +++ b/src/payment/bolt12.rs @@ -13,7 +13,6 @@ use crate::config::LDK_PAYMENT_RETRY_TIMEOUT; use crate::error::Error; use crate::logger::{log_error, log_info, LdkLogger, Logger}; use crate::payment::store::{PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus}; -use crate::runtime::Runtime; use crate::types::{ChannelManager, PaymentStore}; use lightning::ln::channelmanager::{PaymentId, Retry}; @@ -26,7 +25,7 @@ use lightning::util::string::UntrustedString; use rand::RngCore; use std::num::NonZeroU64; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; /// A payment handler allowing to create and pay [BOLT 12] offers and refunds. @@ -36,18 +35,18 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; /// [BOLT 12]: https://github.com/lightning/bolts/blob/master/12-offer-encoding.md /// [`Node::bolt12_payment`]: crate::Node::bolt12_payment pub struct Bolt12Payment { - runtime: Arc, channel_manager: Arc, payment_store: Arc, + is_running: Arc>, logger: Arc, } impl Bolt12Payment { pub(crate) fn new( - runtime: Arc, channel_manager: Arc, - payment_store: Arc, logger: Arc, + channel_manager: Arc, payment_store: Arc, + is_running: Arc>, logger: Arc, ) -> Self { - Self { runtime, channel_manager, payment_store, logger } + Self { channel_manager, payment_store, is_running, logger } } /// Send a payment given an offer. @@ -59,7 +58,7 @@ impl Bolt12Payment { pub fn send( &self, offer: &Offer, quantity: Option, payer_note: Option, ) -> Result { - if !self.runtime.is_running() { + if !*self.is_running.read().unwrap() { return Err(Error::NotRunning); } @@ -160,7 +159,7 @@ impl Bolt12Payment { pub fn send_using_amount( &self, offer: &Offer, amount_msat: u64, quantity: Option, payer_note: Option, ) -> Result { - if !self.runtime.is_running() { + if !*self.is_running.read().unwrap() { return Err(Error::NotRunning); } diff --git a/src/payment/onchain.rs b/src/payment/onchain.rs index e57606474..2614e55ce 100644 --- a/src/payment/onchain.rs +++ b/src/payment/onchain.rs @@ -10,13 +10,12 @@ use crate::config::Config; use crate::error::Error; use crate::logger::{log_info, LdkLogger, Logger}; -use crate::runtime::Runtime; use crate::types::{ChannelManager, Wallet}; use crate::wallet::OnchainSendAmount; use bitcoin::{Address, Txid}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; #[cfg(not(feature = "uniffi"))] type FeeRate = bitcoin::FeeRate; @@ -42,19 +41,19 @@ macro_rules! maybe_map_fee_rate_opt { /// /// [`Node::onchain_payment`]: crate::Node::onchain_payment pub struct OnchainPayment { - runtime: Arc, wallet: Arc, channel_manager: Arc, config: Arc, + is_running: Arc>, logger: Arc, } impl OnchainPayment { pub(crate) fn new( - runtime: Arc, wallet: Arc, channel_manager: Arc, - config: Arc, logger: Arc, + wallet: Arc, channel_manager: Arc, config: Arc, + is_running: Arc>, logger: Arc, ) -> Self { - Self { runtime, wallet, channel_manager, config, logger } + Self { wallet, channel_manager, config, is_running, logger } } /// Retrieve a new on-chain/funding address. @@ -76,7 +75,7 @@ impl OnchainPayment { pub fn send_to_address( &self, address: &bitcoin::Address, amount_sats: u64, fee_rate: Option, ) -> Result { - if !self.runtime.is_running() { + if !*self.is_running.read().unwrap() { return Err(Error::NotRunning); } @@ -106,7 +105,7 @@ impl OnchainPayment { pub fn send_all_to_address( &self, address: &bitcoin::Address, retain_reserves: bool, fee_rate: Option, ) -> Result { - if !self.runtime.is_running() { + if !*self.is_running.read().unwrap() { return Err(Error::NotRunning); } diff --git a/src/payment/spontaneous.rs b/src/payment/spontaneous.rs index ec84fcace..cdac80ff7 100644 --- a/src/payment/spontaneous.rs +++ b/src/payment/spontaneous.rs @@ -12,7 +12,6 @@ use crate::error::Error; use crate::logger::{log_error, log_info, LdkLogger, Logger}; use crate::payment::store::{PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus}; use crate::payment::SendingParameters; -use crate::runtime::Runtime; use crate::types::{ChannelManager, CustomTlvRecord, KeysManager, PaymentStore}; use lightning::ln::channelmanager::{PaymentId, RecipientOnionFields, Retry, RetryableSendFailure}; @@ -23,7 +22,7 @@ use lightning_types::payment::{PaymentHash, PaymentPreimage}; use bitcoin::secp256k1::PublicKey; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; // The default `final_cltv_expiry_delta` we apply when not set. const LDK_DEFAULT_FINAL_CLTV_EXPIRY_DELTA: u32 = 144; @@ -34,21 +33,21 @@ const LDK_DEFAULT_FINAL_CLTV_EXPIRY_DELTA: u32 = 144; /// /// [`Node::spontaneous_payment`]: crate::Node::spontaneous_payment pub struct SpontaneousPayment { - runtime: Arc, channel_manager: Arc, keys_manager: Arc, payment_store: Arc, config: Arc, + is_running: Arc>, logger: Arc, } impl SpontaneousPayment { pub(crate) fn new( - runtime: Arc, channel_manager: Arc, - keys_manager: Arc, payment_store: Arc, config: Arc, + channel_manager: Arc, keys_manager: Arc, + payment_store: Arc, config: Arc, is_running: Arc>, logger: Arc, ) -> Self { - Self { runtime, channel_manager, keys_manager, payment_store, config, logger } + Self { channel_manager, keys_manager, payment_store, config, is_running, logger } } /// Send a spontaneous aka. "keysend", payment. @@ -73,7 +72,7 @@ impl SpontaneousPayment { &self, amount_msat: u64, node_id: PublicKey, sending_parameters: Option, custom_tlvs: Option>, ) -> Result { - if !self.runtime.is_running() { + if !*self.is_running.read().unwrap() { return Err(Error::NotRunning); } @@ -180,7 +179,7 @@ impl SpontaneousPayment { /// /// [`Bolt11Payment::send_probes`]: crate::payment::Bolt11Payment pub fn send_probes(&self, amount_msat: u64, node_id: PublicKey) -> Result<(), Error> { - if !self.runtime.is_running() { + if !*self.is_running.read().unwrap() { return Err(Error::NotRunning); } diff --git a/src/runtime.rs b/src/runtime.rs index dd6e2c122..2baa364c0 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -5,157 +5,68 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -#[cfg(tokio_unstable)] -use crate::logger::log_trace; -use crate::logger::{log_error, LdkLogger, Logger}; - use tokio::task::JoinHandle; -use std::fmt; use std::future::Future; -use std::sync::{Arc, RwLock}; pub(crate) struct Runtime { - state: RwLock, - logger: Arc, + mode: RuntimeMode, } impl Runtime { - pub fn new(logger: Arc) -> Self { - let state = RwLock::new(RuntimeState::Stopped); - Self { state, logger } - } - - pub fn start(&self) -> Result<(), RuntimeError> { - let mut state_lock = self.state.write().unwrap(); - if !matches!(*state_lock, RuntimeState::Stopped) { - return Err(RuntimeError::AlreadyRunning); - } - match tokio::runtime::Handle::try_current() { - Ok(handle) => *state_lock = RuntimeState::Handle(handle), + pub fn new() -> Result { + let mode = match tokio::runtime::Handle::try_current() { + Ok(handle) => RuntimeMode::Handle(handle), Err(_) => { - let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build().map_err( - |e| { - log_error!(self.logger, "Failed to setup tokio runtime: {}", e); - RuntimeError::SetupFailed - }, - )?; - *state_lock = RuntimeState::Owned(rt) + let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build()?; + RuntimeMode::Owned(rt) }, - } - Ok(()) + }; + Ok(Self { mode }) } - pub fn start_from_handle(&self, handle: tokio::runtime::Handle) -> Result<(), RuntimeError> { - let mut state_lock = self.state.write().unwrap(); - if !matches!(*state_lock, RuntimeState::Stopped) { - return Err(RuntimeError::AlreadyRunning); - } - *state_lock = RuntimeState::Handle(handle); - Ok(()) + pub fn with_handle(handle: tokio::runtime::Handle) -> Self { + let mode = RuntimeMode::Handle(handle); + Self { mode } } - pub fn stop(&self) -> Result<(), RuntimeError> { - let mut state_lock = self.state.write().unwrap(); - if matches!(*state_lock, RuntimeState::Stopped) { - return Err(RuntimeError::NotRunning); - } - - let old_state = core::mem::replace(&mut *state_lock, RuntimeState::Stopped); - match old_state { - RuntimeState::Owned(rt) => { - #[cfg(tokio_unstable)] - log_trace!( - self.logger, - "Active runtime tasks left prior to shutdown: {}", - rt.metrics().active_tasks_count() - ); - rt.shutdown_background(); - }, - RuntimeState::Handle(_handle) => { - #[cfg(tokio_unstable)] - log_trace!( - self.logger, - "Active runtime tasks left prior to shutdown: {}", - _handle.metrics().active_tasks_count() - ); - }, - RuntimeState::Stopped => return Err(RuntimeError::NotRunning), - } - - Ok(()) - } - - pub fn is_running(&self) -> bool { - !matches!(*self.state.read().unwrap(), RuntimeState::Stopped) - } - - pub fn spawn(&self, future: F) -> Result, RuntimeError> + pub fn spawn(&self, future: F) -> JoinHandle where F: Future + Send + 'static, F::Output: Send + 'static, { - let handle = self.handle()?; - Ok(handle.spawn(future)) + let handle = self.handle(); + handle.spawn(future) } - pub fn spawn_blocking(&self, func: F) -> Result, RuntimeError> + pub fn spawn_blocking(&self, func: F) -> JoinHandle where F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let handle = self.handle()?; - Ok(handle.spawn_blocking(func)) + let handle = self.handle(); + handle.spawn_blocking(func) } - pub fn block_on(&self, future: F) -> Result { - let handle = self.handle()?; - Ok(tokio::task::block_in_place(move || handle.block_on(future))) + pub fn block_on(&self, future: F) -> F::Output { + // While we generally decided not to overthink via which call graph users would enter our + // runtime context, we'd still try to reuse whatever current context would be present + // during `block_on`, as this is the context `block_in_place` would operate on. So we try + // to detect the outer context here, and otherwise use whatever was set during + // initialization. + let handle = tokio::runtime::Handle::try_current().unwrap_or(self.handle()); + tokio::task::block_in_place(move || handle.block_on(future)) } - fn handle(&self) -> Result { - match &*self.state.read().unwrap() { - RuntimeState::Owned(rt) => Ok(rt.handle().clone()), - RuntimeState::Handle(handle) => Ok(handle.clone()), - RuntimeState::Stopped => Err(RuntimeError::NotRunning), + pub fn handle(&self) -> tokio::runtime::Handle { + match &self.mode { + RuntimeMode::Owned(rt) => rt.handle().clone(), + RuntimeMode::Handle(handle) => handle.clone(), } } } -impl Drop for Runtime { - fn drop(&mut self) { - let _ = self.stop(); - } -} - -enum RuntimeState { +enum RuntimeMode { Owned(tokio::runtime::Runtime), Handle(tokio::runtime::Handle), - Stopped, -} - -#[derive(Debug)] -pub(crate) enum RuntimeError { - SetupFailed, - AlreadyRunning, - NotRunning, -} - -impl std::error::Error for RuntimeError {} - -impl fmt::Display for RuntimeError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self { - Self::AlreadyRunning => write!(f, "Runtime already running."), - Self::NotRunning => write!(f, "Runtime is not running."), - Self::SetupFailed => write!(f, "Failed to setup runtime."), - } - } -} - -impl From for std::io::Error { - fn from(runtime_error: RuntimeError) -> Self { - let msg = format!("Runtime error: {}", runtime_error); - std::io::Error::new(std::io::ErrorKind::Other, msg) - } } From 22e2e1cc09ffdca4ae3343a4724534e735ab01b7 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 23 May 2025 13:51:15 +0200 Subject: [PATCH 05/10] f Rexport `tokio` and allow dead code in `uniffi` builder method --- src/builder.rs | 1 + src/lib.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/src/builder.rs b/src/builder.rs index cd2bf76d5..e6ebdb47a 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -259,6 +259,7 @@ impl NodeBuilder { /// /// If not provided, the node will spawn its own runtime or reuse any outer runtime context it /// can detect. + #[cfg_attr(feature = "uniffi", allow(dead_code))] pub fn set_runtime(&mut self, runtime_handle: tokio::runtime::Handle) -> &mut Self { self.runtime_handle = Some(runtime_handle); self diff --git a/src/lib.rs b/src/lib.rs index f9f33e66f..e142c6a71 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -107,6 +107,7 @@ pub use lightning; pub use lightning_invoice; pub use lightning_liquidity; pub use lightning_types; +pub use tokio; pub use vss_client; pub use balance::{BalanceDetails, LightningBalance, PendingSweepBalance}; From 5530016f44623254a254a40f713fb82ae581f85f Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 23 May 2025 13:59:59 +0200 Subject: [PATCH 06/10] f oops --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index e142c6a71..4747eb77d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -// This file is Copyright its original authors, visible in version control history.lib +// This file is Copyright its original authors, visible in version control history. // // This file is licensed under the Apache License, Version 2.0 or the MIT license Date: Thu, 22 May 2025 11:17:24 +0200 Subject: [PATCH 07/10] Use `crate::runtime::Runtime` for `VssStore` .. which now gives us cleaner reuse/handling of outer runtime contexts, cleanup on `Drop`, etc. --- src/builder.rs | 5 +-- src/io/vss_store.rs | 107 +++++++++++++++++++++++++------------------- 2 files changed, 61 insertions(+), 51 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index e6ebdb47a..63c41f227 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -626,10 +626,7 @@ impl NodeBuilder { let vss_seed_bytes: [u8; 32] = vss_xprv.private_key.secret_bytes(); let vss_store = - VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider).map_err(|e| { - log_error!(logger, "Failed to setup VssStore: {}", e); - BuildError::KVStoreSetupFailed - })?; + VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider, Arc::clone(&runtime)); build_with_store_internal( config, self.chain_data_source_config.as_ref(), diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index 296eaabe3..e2cfc3c7b 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -6,6 +6,8 @@ // accordance with one or both of these licenses. use crate::io::utils::check_namespace_key_validity; +use crate::runtime::Runtime; + use bitcoin::hashes::{sha256, Hash, HashEngine, Hmac, HmacEngine}; use lightning::io::{self, Error, ErrorKind}; use lightning::util::persist::KVStore; @@ -15,7 +17,6 @@ use rand::RngCore; use std::panic::RefUnwindSafe; use std::sync::Arc; use std::time::Duration; -use tokio::runtime::Runtime; use vss_client::client::VssClient; use vss_client::error::VssError; use vss_client::headers::VssHeaderProvider; @@ -41,7 +42,7 @@ type CustomRetryPolicy = FilteredRetryPolicy< pub struct VssStore { client: VssClient, store_id: String, - runtime: Runtime, + runtime: Arc, storable_builder: StorableBuilder, key_obfuscator: KeyObfuscator, } @@ -49,9 +50,8 @@ pub struct VssStore { impl VssStore { pub(crate) fn new( base_url: String, store_id: String, vss_seed: [u8; 32], - header_provider: Arc, - ) -> io::Result { - let runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build()?; + header_provider: Arc, runtime: Arc, + ) -> Self { let (data_encryption_key, obfuscation_master_key) = derive_data_encryption_and_obfuscation_keys(&vss_seed); let key_obfuscator = KeyObfuscator::new(obfuscation_master_key); @@ -70,7 +70,7 @@ impl VssStore { }) as _); let client = VssClient::new_with_headers(base_url, retry_policy, header_provider); - Ok(Self { client, store_id, runtime, storable_builder, key_obfuscator }) + Self { client, store_id, runtime, storable_builder, key_obfuscator } } fn build_key( @@ -136,19 +136,16 @@ impl KVStore for VssStore { store_id: self.store_id.clone(), key: self.build_key(primary_namespace, secondary_namespace, key)?, }; - - let resp = - tokio::task::block_in_place(|| self.runtime.block_on(self.client.get_object(&request))) - .map_err(|e| { - let msg = format!( - "Failed to read from key {}/{}/{}: {}", - primary_namespace, secondary_namespace, key, e - ); - match e { - VssError::NoSuchKeyError(..) => Error::new(ErrorKind::NotFound, msg), - _ => Error::new(ErrorKind::Other, msg), - } - })?; + let resp = self.runtime.block_on(self.client.get_object(&request)).map_err(|e| { + let msg = format!( + "Failed to read from key {}/{}/{}: {}", + primary_namespace, secondary_namespace, key, e + ); + match e { + VssError::NoSuchKeyError(..) => Error::new(ErrorKind::NotFound, msg), + _ => Error::new(ErrorKind::Other, msg), + } + })?; // unwrap safety: resp.value must be always present for a non-erroneous VSS response, otherwise // it is an API-violation which is converted to [`VssError::InternalServerError`] in [`VssClient`] let storable = Storable::decode(&resp.value.unwrap().value[..]).map_err(|e| { @@ -179,14 +176,13 @@ impl KVStore for VssStore { delete_items: vec![], }; - tokio::task::block_in_place(|| self.runtime.block_on(self.client.put_object(&request))) - .map_err(|e| { - let msg = format!( - "Failed to write to key {}/{}/{}: {}", - primary_namespace, secondary_namespace, key, e - ); - Error::new(ErrorKind::Other, msg) - })?; + self.runtime.block_on(self.client.put_object(&request)).map_err(|e| { + let msg = format!( + "Failed to write to key {}/{}/{}: {}", + primary_namespace, secondary_namespace, key, e + ); + Error::new(ErrorKind::Other, msg) + })?; Ok(()) } @@ -204,30 +200,29 @@ impl KVStore for VssStore { }), }; - tokio::task::block_in_place(|| self.runtime.block_on(self.client.delete_object(&request))) - .map_err(|e| { - let msg = format!( - "Failed to delete key {}/{}/{}: {}", - primary_namespace, secondary_namespace, key, e - ); - Error::new(ErrorKind::Other, msg) - })?; + self.runtime.block_on(self.client.delete_object(&request)).map_err(|e| { + let msg = format!( + "Failed to delete key {}/{}/{}: {}", + primary_namespace, secondary_namespace, key, e + ); + Error::new(ErrorKind::Other, msg) + })?; Ok(()) } fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?; - let keys = tokio::task::block_in_place(|| { - self.runtime.block_on(self.list_all_keys(primary_namespace, secondary_namespace)) - }) - .map_err(|e| { - let msg = format!( - "Failed to retrieve keys in namespace: {}/{} : {}", - primary_namespace, secondary_namespace, e - ); - Error::new(ErrorKind::Other, msg) - })?; + let keys = self + .runtime + .block_on(self.list_all_keys(primary_namespace, secondary_namespace)) + .map_err(|e| { + let msg = format!( + "Failed to retrieve keys in namespace: {}/{} : {}", + primary_namespace, secondary_namespace, e + ); + Error::new(ErrorKind::Other, msg) + })?; Ok(keys) } @@ -266,10 +261,27 @@ mod tests { use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng, RngCore}; use std::collections::HashMap; + use tokio::runtime; use vss_client::headers::FixedHeaders; #[test] - fn read_write_remove_list_persist() { + fn vss_read_write_remove_list_persist() { + let runtime = Arc::new(Runtime::new().unwrap()); + let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap(); + let mut rng = thread_rng(); + let rand_store_id: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect(); + let mut vss_seed = [0u8; 32]; + rng.fill_bytes(&mut vss_seed); + let header_provider = Arc::new(FixedHeaders::new(HashMap::new())); + let vss_store = + VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime).unwrap(); + + do_read_write_remove_list_persist(&vss_store); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn vss_read_write_remove_list_persist_in_runtime_context() { + let runtime = Arc::new(Runtime::new().unwrap()); let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap(); let mut rng = thread_rng(); let rand_store_id: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect(); @@ -277,8 +289,9 @@ mod tests { rng.fill_bytes(&mut vss_seed); let header_provider = Arc::new(FixedHeaders::new(HashMap::new())); let vss_store = - VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider).unwrap(); + VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime).unwrap(); do_read_write_remove_list_persist(&vss_store); + drop(vss_store) } } From bfeee00f6fa5511e759adfd6acbf984a5deb1930 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 23 May 2025 15:05:41 +0200 Subject: [PATCH 08/10] DROPME try --- .github/workflows/vss-integration.yml | 2 +- src/io/vss_store.rs | 25 +++++++++++++++++++++++-- src/runtime.rs | 9 +++++++++ tests/common/logging.rs | 12 ++++++++---- tests/integration_tests_rust.rs | 2 +- tests/integration_tests_vss.rs | 14 ++++++++++++-- 6 files changed, 54 insertions(+), 10 deletions(-) diff --git a/.github/workflows/vss-integration.yml b/.github/workflows/vss-integration.yml index 2a6c63704..d265d682b 100644 --- a/.github/workflows/vss-integration.yml +++ b/.github/workflows/vss-integration.yml @@ -75,7 +75,7 @@ jobs: cd ldk-node export TEST_VSS_BASE_URL="http://localhost:8080/vss" RUSTFLAGS="--cfg vss_test" cargo build --verbose --color always - RUSTFLAGS="--cfg vss_test" cargo test --test integration_tests_vss + RUSTFLAGS="--cfg vss_test --cfg tokio_unstable" cargo test --test integration_tests_vss -- --nocapture - name: Cleanup run: | diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index e2cfc3c7b..0b0c02c40 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -136,6 +136,8 @@ impl KVStore for VssStore { store_id: self.store_id.clone(), key: self.build_key(primary_namespace, secondary_namespace, key)?, }; + + println!("READ: {}/{}/{}", primary_namespace, secondary_namespace, key); let resp = self.runtime.block_on(self.client.get_object(&request)).map_err(|e| { let msg = format!( "Failed to read from key {}/{}/{}: {}", @@ -146,6 +148,7 @@ impl KVStore for VssStore { _ => Error::new(ErrorKind::Other, msg), } })?; + println!("READ DONE: {}/{}/{}", primary_namespace, secondary_namespace, key); // unwrap safety: resp.value must be always present for a non-erroneous VSS response, otherwise // it is an API-violation which is converted to [`VssError::InternalServerError`] in [`VssClient`] let storable = Storable::decode(&resp.value.unwrap().value[..]).map_err(|e| { @@ -176,14 +179,28 @@ impl KVStore for VssStore { delete_items: vec![], }; - self.runtime.block_on(self.client.put_object(&request)).map_err(|e| { + println!("WRITE: {}/{}/{}", primary_namespace, secondary_namespace, key); + let res = self.runtime.block_on(async move { + tokio::time::timeout(Duration::from_secs(100), self.client.put_object(&request)) + .await + .map_err(|e| { + let msg = format!( + "Failed to write to key {}/{}/{}: {}", + primary_namespace, secondary_namespace, key, e + ); + Error::new(ErrorKind::Other, msg) + }) + }); + + println!("WRITE DONE: {}/{}/{}: {:?}", primary_namespace, secondary_namespace, key, res); + + res?.map_err(|e| { let msg = format!( "Failed to write to key {}/{}/{}: {}", primary_namespace, secondary_namespace, key, e ); Error::new(ErrorKind::Other, msg) })?; - Ok(()) } @@ -200,6 +217,7 @@ impl KVStore for VssStore { }), }; + println!("REMOVE: {}/{}/{}", primary_namespace, secondary_namespace, key); self.runtime.block_on(self.client.delete_object(&request)).map_err(|e| { let msg = format!( "Failed to delete key {}/{}/{}: {}", @@ -207,12 +225,14 @@ impl KVStore for VssStore { ); Error::new(ErrorKind::Other, msg) })?; + println!("REMOVE DONE: {}/{}/{}", primary_namespace, secondary_namespace, key); Ok(()) } fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?; + println!("LIST: {}/{}", primary_namespace, secondary_namespace); let keys = self .runtime .block_on(self.list_all_keys(primary_namespace, secondary_namespace)) @@ -223,6 +243,7 @@ impl KVStore for VssStore { ); Error::new(ErrorKind::Other, msg) })?; + println!("LIST DONE: {}/{}", primary_namespace, secondary_namespace); Ok(keys) } diff --git a/src/runtime.rs b/src/runtime.rs index 2baa364c0..dcda74e63 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -55,6 +55,15 @@ impl Runtime { // to detect the outer context here, and otherwise use whatever was set during // initialization. let handle = tokio::runtime::Handle::try_current().unwrap_or(self.handle()); + #[cfg(tokio_unstable)] + { + println!("Tokio blocking queue depth: {}", handle.metrics().blocking_queue_depth()); + println!( + "Tokio num_blocking_threads {} / idle_blocking_threads {}", + handle.metrics().num_blocking_threads(), + handle.metrics().num_idle_blocking_threads() + ); + } tokio::task::block_in_place(move || handle.block_on(future)) } diff --git a/tests/common/logging.rs b/tests/common/logging.rs index 6bceac29a..6db6b3082 100644 --- a/tests/common/logging.rs +++ b/tests/common/logging.rs @@ -22,11 +22,12 @@ impl Default for TestLogWriter { pub(crate) struct MockLogFacadeLogger { logs: Arc>>, + prefix: String, } impl MockLogFacadeLogger { - pub fn new() -> Self { - Self { logs: Arc::new(Mutex::new(Vec::new())) } + pub fn new(prefix: String) -> Self { + Self { prefix, logs: Arc::new(Mutex::new(Vec::new())) } } pub fn retrieve_logs(&self) -> Vec { @@ -48,6 +49,7 @@ impl LogFacadeLog for MockLogFacadeLogger { record.line().unwrap(), record.args() ); + println!("{}: {}", self.prefix, message); self.logs.lock().unwrap().push(message); } @@ -95,8 +97,10 @@ impl<'a> From> for LogFacadeRecord<'a> { } } -pub(crate) fn init_log_logger(level: LogFacadeLevelFilter) -> Arc { - let logger = Arc::new(MockLogFacadeLogger::new()); +pub(crate) fn init_log_logger( + prefix: String, level: LogFacadeLevelFilter, +) -> Arc { + let logger = Arc::new(MockLogFacadeLogger::new(prefix)); log::set_boxed_logger(Box::new(logger.clone())).unwrap(); log::set_max_level(level); diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 3ab528ef5..a2b138a93 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -1275,7 +1275,7 @@ fn facade_logging() { let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); - let logger = init_log_logger(LevelFilter::Trace); + let logger = init_log_logger("".to_owned(), LevelFilter::Trace); let mut config = random_config(false); config.log_writer = TestLogWriter::LogFacade; diff --git a/tests/integration_tests_vss.rs b/tests/integration_tests_vss.rs index 9d6ec158c..615bf8519 100644 --- a/tests/integration_tests_vss.rs +++ b/tests/integration_tests_vss.rs @@ -9,7 +9,9 @@ mod common; +use common::logging::{init_log_logger, TestLogWriter}; use ldk_node::Builder; +use log::LevelFilter; use std::collections::HashMap; #[test] @@ -17,9 +19,13 @@ fn channel_full_cycle_with_vss_store() { let (bitcoind, electrsd) = common::setup_bitcoind_and_electrsd(); println!("== Node A =="); let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); - let config_a = common::random_config(true); + let mut config_a = common::random_config(true); + let prefix_a = "A".to_string(); + let logger_a = init_log_logger(prefix_a, LevelFilter::Trace); + config_a.log_writer = TestLogWriter::LogFacade; let mut builder_a = Builder::from_config(config_a.node_config); builder_a.set_chain_source_esplora(esplora_url.clone(), None); + builder_a.set_log_facade_logger(); let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap(); let node_a = builder_a .build_with_vss_store_and_fixed_headers( @@ -31,9 +37,13 @@ fn channel_full_cycle_with_vss_store() { node_a.start().unwrap(); println!("\n== Node B =="); - let config_b = common::random_config(true); + let mut config_b = common::random_config(true); + //let prefix_b = "B".to_string(); + //let logger_b = init_log_logger(prefix_b, LevelFilter::Trace); + //config_a.log_writer = TestLogWriter::LogFacade; let mut builder_b = Builder::from_config(config_b.node_config); builder_b.set_chain_source_esplora(esplora_url.clone(), None); + //builder_b.set_log_facade_logger(); let node_b = builder_b .build_with_vss_store_and_fixed_headers( vss_base_url, From 76793feab868bdfd1d998a7a083d0199e8eaa90b Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 26 May 2025 16:38:04 +0200 Subject: [PATCH 09/10] DROPME: try vss client timeouts --- Cargo.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 5ce10d6ad..6f812920c 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,7 +84,8 @@ serde = { version = "1.0.210", default-features = false, features = ["std", "der serde_json = { version = "1.0.128", default-features = false, features = ["std"] } log = { version = "0.4.22", default-features = false, features = ["std"]} -vss-client = "0.3" +#vss-client = "0.3" +vss-client = { git = "https://github.com/tnull/vss-rust-client", branch = "2025-05-try-client-timeout" } prost = { version = "0.11.6", default-features = false} [target.'cfg(windows)'.dependencies] From 47207f532480dec7e2272483d2ede1a72d283780 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 27 May 2025 12:24:39 +0200 Subject: [PATCH 10/10] f DROPME TRY sync_wallets --- src/lib.rs | 68 ++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 48 insertions(+), 20 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 4747eb77d..778587d71 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1239,27 +1239,55 @@ impl Node { let sync_cman = Arc::clone(&self.channel_manager); let sync_cmon = Arc::clone(&self.chain_monitor); let sync_sweeper = Arc::clone(&self.output_sweeper); - self.runtime.block_on(async move { - match chain_source.as_ref() { - ChainSource::Esplora { .. } => { - chain_source.update_fee_rate_estimates().await?; - chain_source.sync_lightning_wallet(sync_cman, sync_cmon, sync_sweeper).await?; - chain_source.sync_onchain_wallet().await?; - }, - ChainSource::Electrum { .. } => { - chain_source.update_fee_rate_estimates().await?; - chain_source.sync_lightning_wallet(sync_cman, sync_cmon, sync_sweeper).await?; - chain_source.sync_onchain_wallet().await?; - }, - ChainSource::BitcoindRpc { .. } => { - chain_source.update_fee_rate_estimates().await?; - chain_source - .poll_and_update_listeners(sync_cman, sync_cmon, sync_sweeper) - .await?; - }, - } - Ok(()) + tokio::task::block_in_place(move || { + tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap().block_on( + async move { + match chain_source.as_ref() { + ChainSource::Esplora { .. } => { + chain_source.update_fee_rate_estimates().await?; + chain_source + .sync_lightning_wallet(sync_cman, sync_cmon, sync_sweeper) + .await?; + chain_source.sync_onchain_wallet().await?; + }, + ChainSource::Electrum { .. } => { + chain_source.update_fee_rate_estimates().await?; + chain_source + .sync_lightning_wallet(sync_cman, sync_cmon, sync_sweeper) + .await?; + chain_source.sync_onchain_wallet().await?; + }, + ChainSource::BitcoindRpc { .. } => { + chain_source.update_fee_rate_estimates().await?; + chain_source + .poll_and_update_listeners(sync_cman, sync_cmon, sync_sweeper) + .await?; + }, + } + Ok(()) + }) }) + //self.runtime.block_on(async move { + // match chain_source.as_ref() { + // ChainSource::Esplora { .. } => { + // chain_source.update_fee_rate_estimates().await?; + // chain_source.sync_lightning_wallet(sync_cman, sync_cmon, sync_sweeper).await?; + // chain_source.sync_onchain_wallet().await?; + // }, + // ChainSource::Electrum { .. } => { + // chain_source.update_fee_rate_estimates().await?; + // chain_source.sync_lightning_wallet(sync_cman, sync_cmon, sync_sweeper).await?; + // chain_source.sync_onchain_wallet().await?; + // }, + // ChainSource::BitcoindRpc { .. } => { + // chain_source.update_fee_rate_estimates().await?; + // chain_source + // .poll_and_update_listeners(sync_cman, sync_cmon, sync_sweeper) + // .await?; + // }, + // } + // Ok(()) + //}) } /// Close a previously opened channel.