Skip to content

Introduce Runtime object allowng to detect outer runtime context #543

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/vss-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ jobs:
cd ldk-node
export TEST_VSS_BASE_URL="http://localhost:8080/vss"
RUSTFLAGS="--cfg vss_test" cargo build --verbose --color always
RUSTFLAGS="--cfg vss_test" cargo test --test integration_tests_vss
RUSTFLAGS="--cfg vss_test --cfg tokio_unstable" cargo test --test integration_tests_vss -- --nocapture

- name: Cleanup
run: |
Expand Down
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ serde = { version = "1.0.210", default-features = false, features = ["std", "der
serde_json = { version = "1.0.128", default-features = false, features = ["std"] }
log = { version = "0.4.22", default-features = false, features = ["std"]}

vss-client = "0.3"
#vss-client = "0.3"
vss-client = { git = "https://github.com/tnull/vss-rust-client", branch = "2025-05-try-client-timeout" }
prost = { version = "0.11.6", default-features = false}

[target.'cfg(windows)'.dependencies]
Expand Down
7 changes: 4 additions & 3 deletions bindings/ldk_node.udl
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ dictionary LogRecord {

[Trait, WithForeign]
interface LogWriter {
void log(LogRecord record);
void log(LogRecord record);
};

interface Builder {
Expand Down Expand Up @@ -160,8 +160,8 @@ interface Node {

[Enum]
interface Bolt11InvoiceDescription {
Hash(string hash);
Direct(string description);
Hash(string hash);
Direct(string description);
};

interface Bolt11Payment {
Expand Down Expand Up @@ -330,6 +330,7 @@ enum BuildError {
"InvalidListeningAddresses",
"InvalidAnnouncementAddresses",
"InvalidNodeAlias",
"RuntimeSetupFailed",
"ReadFailed",
"WriteFailed",
"StoragePathAccessFailed",
Expand Down
49 changes: 42 additions & 7 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::liquidity::{
use crate::logger::{log_error, log_info, LdkLogger, LogLevel, LogWriter, Logger};
use crate::message_handler::NodeCustomMessageHandler;
use crate::peer_store::PeerStore;
use crate::runtime::Runtime;
use crate::tx_broadcaster::TransactionBroadcaster;
use crate::types::{
ChainMonitor, ChannelManager, DynStore, GossipSync, Graph, KeysManager, MessageRouter,
Expand Down Expand Up @@ -154,6 +155,8 @@ pub enum BuildError {
InvalidAnnouncementAddresses,
/// The provided alias is invalid.
InvalidNodeAlias,
/// An attempt to setup a runtime has failed.
RuntimeSetupFailed,
/// We failed to read data from the [`KVStore`].
///
/// [`KVStore`]: lightning::util::persist::KVStore
Expand Down Expand Up @@ -191,6 +194,7 @@ impl fmt::Display for BuildError {
Self::InvalidAnnouncementAddresses => {
write!(f, "Given announcement addresses are invalid.")
},
Self::RuntimeSetupFailed => write!(f, "Failed to setup a runtime."),
Self::ReadFailed => write!(f, "Failed to read from store."),
Self::WriteFailed => write!(f, "Failed to write to store."),
Self::StoragePathAccessFailed => write!(f, "Failed to access the given storage path."),
Expand Down Expand Up @@ -222,6 +226,7 @@ pub struct NodeBuilder {
gossip_source_config: Option<GossipSourceConfig>,
liquidity_source_config: Option<LiquiditySourceConfig>,
log_writer_config: Option<LogWriterConfig>,
runtime_handle: Option<tokio::runtime::Handle>,
}

impl NodeBuilder {
Expand All @@ -238,16 +243,28 @@ impl NodeBuilder {
let gossip_source_config = None;
let liquidity_source_config = None;
let log_writer_config = None;
let runtime_handle = None;
Self {
config,
entropy_source_config,
chain_data_source_config,
gossip_source_config,
liquidity_source_config,
log_writer_config,
runtime_handle,
}
}

/// Configures the [`Node`] instance to (re-)use a specific `tokio` runtime.
///
/// If not provided, the node will spawn its own runtime or reuse any outer runtime context it
/// can detect.
#[cfg_attr(feature = "uniffi", allow(dead_code))]
pub fn set_runtime(&mut self, runtime_handle: tokio::runtime::Handle) -> &mut Self {
self.runtime_handle = Some(runtime_handle);
self
}

/// Configures the [`Node`] instance to source its wallet entropy from a seed file on disk.
///
/// If the given file does not exist a new random seed file will be generated and
Expand Down Expand Up @@ -582,6 +599,15 @@ impl NodeBuilder {
) -> Result<Node, BuildError> {
let logger = setup_logger(&self.log_writer_config, &self.config)?;

let runtime = if let Some(handle) = self.runtime_handle.as_ref() {
Arc::new(Runtime::with_handle(handle.clone()))
} else {
Arc::new(Runtime::new().map_err(|e| {
log_error!(logger, "Failed to setup tokio runtime: {}", e);
BuildError::RuntimeSetupFailed
})?)
};

let seed_bytes = seed_bytes_from_config(
&self.config,
self.entropy_source_config.as_ref(),
Expand All @@ -600,16 +626,14 @@ impl NodeBuilder {
let vss_seed_bytes: [u8; 32] = vss_xprv.private_key.secret_bytes();

let vss_store =
VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider).map_err(|e| {
log_error!(logger, "Failed to setup VssStore: {}", e);
BuildError::KVStoreSetupFailed
})?;
VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider, Arc::clone(&runtime));
build_with_store_internal(
config,
self.chain_data_source_config.as_ref(),
self.gossip_source_config.as_ref(),
self.liquidity_source_config.as_ref(),
seed_bytes,
runtime,
logger,
Arc::new(vss_store),
)
Expand All @@ -619,6 +643,15 @@ impl NodeBuilder {
pub fn build_with_store(&self, kv_store: Arc<DynStore>) -> Result<Node, BuildError> {
let logger = setup_logger(&self.log_writer_config, &self.config)?;

let runtime = if let Some(handle) = self.runtime_handle.as_ref() {
Arc::new(Runtime::with_handle(handle.clone()))
} else {
Arc::new(Runtime::new().map_err(|e| {
log_error!(logger, "Failed to setup tokio runtime: {}", e);
BuildError::RuntimeSetupFailed
})?)
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code reuse opportunity

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NACK, IMO it's much more readable to keep short blocks like this inlined, instead of having the reader jump all over the file, losing context.

Copy link
Contributor

@joostjager joostjager May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't agree. I think eliminating the risk of future code changes not being applied in both places is more important than having the reader jump to a function. The function can have a descriptive name too such as build_runtime. I don't think it is bad for readability at all.

Your argument of jumping all over the file would also apply to code that isn't necessarily duplicated. Because also in that case, the reader would have to jump. I think that would lead to long function bodies, of which there are already too many in ldk, and those absolutely do not help with readability.


let seed_bytes = seed_bytes_from_config(
&self.config,
self.entropy_source_config.as_ref(),
Expand All @@ -632,6 +665,7 @@ impl NodeBuilder {
self.gossip_source_config.as_ref(),
self.liquidity_source_config.as_ref(),
seed_bytes,
runtime,
logger,
kv_store,
)
Expand Down Expand Up @@ -934,7 +968,7 @@ fn build_with_store_internal(
config: Arc<Config>, chain_data_source_config: Option<&ChainDataSourceConfig>,
gossip_source_config: Option<&GossipSourceConfig>,
liquidity_source_config: Option<&LiquiditySourceConfig>, seed_bytes: [u8; 64],
logger: Arc<Logger>, kv_store: Arc<DynStore>,
runtime: Arc<Runtime>, logger: Arc<Logger>, kv_store: Arc<DynStore>,
) -> Result<Node, BuildError> {
if let Err(err) = may_announce_channel(&config) {
if config.announcement_addresses.is_some() {
Expand Down Expand Up @@ -1101,8 +1135,6 @@ fn build_with_store_internal(
},
};

let runtime = Arc::new(RwLock::new(None));

// Initialize the ChainMonitor
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
Some(Arc::clone(&chain_source)),
Expand Down Expand Up @@ -1495,6 +1527,8 @@ fn build_with_store_internal(
let (stop_sender, _) = tokio::sync::watch::channel(());
let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(());

let is_running = Arc::new(RwLock::new(false));

Ok(Node {
runtime,
stop_sender,
Expand All @@ -1520,6 +1554,7 @@ fn build_with_store_internal(
scorer,
peer_store,
payment_store,
is_running,
is_listening,
node_metrics,
})
Expand Down
7 changes: 3 additions & 4 deletions src/chain/electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::fee_estimator::{
ConfirmationTarget,
};
use crate::logger::{log_bytes, log_error, log_info, log_trace, LdkLogger, Logger};
use crate::runtime::Runtime;

use lightning::chain::{Confirm, Filter, WatchedOutput};
use lightning::util::ser::Writeable;
Expand Down Expand Up @@ -46,15 +47,14 @@ pub(crate) struct ElectrumRuntimeClient {
electrum_client: Arc<ElectrumClient>,
bdk_electrum_client: Arc<BdkElectrumClient<ElectrumClient>>,
tx_sync: Arc<ElectrumSyncClient<Arc<Logger>>>,
runtime: Arc<tokio::runtime::Runtime>,
runtime: Arc<Runtime>,
config: Arc<Config>,
logger: Arc<Logger>,
}

impl ElectrumRuntimeClient {
pub(crate) fn new(
server_url: String, runtime: Arc<tokio::runtime::Runtime>, config: Arc<Config>,
logger: Arc<Logger>,
server_url: String, runtime: Arc<Runtime>, config: Arc<Config>, logger: Arc<Logger>,
) -> Result<Self, Error> {
let electrum_config = ElectrumConfigBuilder::new()
.retry(ELECTRUM_CLIENT_NUM_RETRIES)
Expand Down Expand Up @@ -187,7 +187,6 @@ impl ElectrumRuntimeClient {

let spawn_fut =
self.runtime.spawn_blocking(move || electrum_client.transaction_broadcast(&tx));

let timeout_fut =
tokio::time::timeout(Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS), spawn_fut);

Expand Down
5 changes: 3 additions & 2 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::fee_estimator::{
};
use crate::io::utils::write_node_metrics;
use crate::logger::{log_bytes, log_error, log_info, log_trace, LdkLogger, Logger};
use crate::runtime::Runtime;
use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
use crate::{Error, NodeMetrics};

Expand Down Expand Up @@ -126,7 +127,7 @@ impl ElectrumRuntimeStatus {
}

pub(crate) fn start(
&mut self, server_url: String, runtime: Arc<tokio::runtime::Runtime>, config: Arc<Config>,
&mut self, server_url: String, runtime: Arc<Runtime>, config: Arc<Config>,
logger: Arc<Logger>,
) -> Result<(), Error> {
match self {
Expand Down Expand Up @@ -311,7 +312,7 @@ impl ChainSource {
}
}

pub(crate) fn start(&self, runtime: Arc<tokio::runtime::Runtime>) -> Result<(), Error> {
pub(crate) fn start(&self, runtime: Arc<Runtime>) -> Result<(), Error> {
match self {
Self::Electrum { server_url, electrum_runtime_status, config, logger, .. } => {
electrum_runtime_status.write().unwrap().start(
Expand Down
69 changes: 32 additions & 37 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use crate::io::{
};
use crate::logger::{log_debug, log_error, log_info, LdkLogger};

use crate::runtime::Runtime;

use lightning::events::bump_transaction::BumpTransactionEvent;
use lightning::events::{ClosureReason, PaymentPurpose, ReplayEvent};
use lightning::events::{Event as LdkEvent, PaymentFailureReason};
Expand All @@ -53,7 +55,7 @@ use core::future::Future;
use core::task::{Poll, Waker};
use std::collections::VecDeque;
use std::ops::Deref;
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;

/// An event emitted by [`Node`], which should be handled by the user.
Expand Down Expand Up @@ -451,7 +453,7 @@ where
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
payment_store: Arc<PaymentStore>,
peer_store: Arc<PeerStore<L>>,
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>,
runtime: Arc<Runtime>,
logger: L,
config: Arc<Config>,
}
Expand All @@ -466,8 +468,8 @@ where
channel_manager: Arc<ChannelManager>, connection_manager: Arc<ConnectionManager<L>>,
output_sweeper: Arc<Sweeper>, network_graph: Arc<Graph>,
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>,
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>, logger: L, config: Arc<Config>,
payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>, runtime: Arc<Runtime>,
logger: L, config: Arc<Config>,
) -> Self {
Self {
event_queue,
Expand Down Expand Up @@ -1049,17 +1051,14 @@ where
let forwarding_channel_manager = self.channel_manager.clone();
let min = time_forwardable.as_millis() as u64;

let runtime_lock = self.runtime.read().unwrap();
debug_assert!(runtime_lock.is_some());
let future = async move {
let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64;
tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await;

if let Some(runtime) = runtime_lock.as_ref() {
runtime.spawn(async move {
let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64;
tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await;
forwarding_channel_manager.process_pending_htlc_forwards();
};

forwarding_channel_manager.process_pending_htlc_forwards();
});
}
self.runtime.spawn(future);
},
LdkEvent::SpendableOutputs { outputs, channel_id } => {
match self.output_sweeper.track_spendable_outputs(outputs, channel_id, true, None) {
Expand Down Expand Up @@ -1419,31 +1418,27 @@ where
debug_assert!(false, "We currently don't handle BOLT12 invoices manually, so this event should never be emitted.");
},
LdkEvent::ConnectionNeeded { node_id, addresses } => {
let runtime_lock = self.runtime.read().unwrap();
debug_assert!(runtime_lock.is_some());

if let Some(runtime) = runtime_lock.as_ref() {
let spawn_logger = self.logger.clone();
let spawn_cm = Arc::clone(&self.connection_manager);
runtime.spawn(async move {
for addr in &addresses {
match spawn_cm.connect_peer_if_necessary(node_id, addr.clone()).await {
Ok(()) => {
return;
},
Err(e) => {
log_error!(
spawn_logger,
"Failed to establish connection to peer {}@{}: {}",
node_id,
addr,
e
);
},
}
let spawn_logger = self.logger.clone();
let spawn_cm = Arc::clone(&self.connection_manager);
let future = async move {
for addr in &addresses {
match spawn_cm.connect_peer_if_necessary(node_id, addr.clone()).await {
Ok(()) => {
return;
},
Err(e) => {
log_error!(
spawn_logger,
"Failed to establish connection to peer {}@{}: {}",
node_id,
addr,
e
);
},
}
});
}
}
};
self.runtime.spawn(future);
},
LdkEvent::BumpTransaction(bte) => {
match bte {
Expand Down
Loading
Loading