From 252b2d30d759d89d8563610337f4164bca21f8f2 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 27 Jan 2025 16:27:41 +0100 Subject: [PATCH 1/5] refactor hashmap to channelliquidities struct Wrap the liquidities hash map into a struct so that decay and serialization functionality can be attached. This allows external data to be serialized into this struct and decayed to make it comparable and mergeable. --- lightning/src/routing/scoring.rs | 118 ++++++++++++++++++++++--------- 1 file changed, 86 insertions(+), 32 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 431f1597c17..60ca5ccc054 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -57,13 +57,14 @@ use crate::routing::router::{Path, CandidateRouteHop, PublicHopCandidate}; use crate::routing::log_approx; use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use crate::util::logger::Logger; - use crate::prelude::*; +use crate::prelude::hash_map::Entry; use core::{cmp, fmt}; use core::ops::{Deref, DerefMut}; use core::time::Duration; use crate::io::{self, Read}; use crate::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +use bucketed_history::{LegacyHistoricalBucketRangeTracker, HistoricalBucketRangeTracker, DirectedHistoricalLiquidityTracker, HistoricalLiquidityTracker}; #[cfg(not(c_bindings))] use { core::cell::{RefCell, RefMut, Ref}, @@ -474,7 +475,86 @@ where L::Target: Logger { decay_params: ProbabilisticScoringDecayParameters, network_graph: G, logger: L, - channel_liquidities: HashMap, + channel_liquidities: ChannelLiquidities, +} +/// Container for live and historical liquidity bounds for each channel. +pub struct ChannelLiquidities(HashMap); + +impl ChannelLiquidities { + fn new() -> Self { + Self(new_hash_map()) + } + + fn time_passed(&mut self, duration_since_epoch: Duration, decay_params: ProbabilisticScoringDecayParameters) { + self.0.retain(|_scid, liquidity| { + liquidity.min_liquidity_offset_msat = + liquidity.decayed_offset(liquidity.min_liquidity_offset_msat, duration_since_epoch, decay_params); + liquidity.max_liquidity_offset_msat = + liquidity.decayed_offset(liquidity.max_liquidity_offset_msat, duration_since_epoch, decay_params); + liquidity.last_updated = duration_since_epoch; + + // Only decay the historical buckets if there hasn't been new data for a while. This ties back to our + // earlier conclusion that fixed half-lives for scoring data are inherently flawed—they tend to be either + // too fast or too slow. Ideally, historical buckets should only decay as new data is added, which naturally + // happens when fresh data arrives. However, scoring a channel based on month-old data while treating it the + // same as one with minute-old data is problematic. To address this, we introduced a decay mechanism, but it + // runs very slowly and only activates when no new data has been received for a while, as our preference is + // to decay based on incoming data. + let elapsed_time = + duration_since_epoch.saturating_sub(liquidity.offset_history_last_updated); + if elapsed_time > decay_params.historical_no_updates_half_life { + let half_life = decay_params.historical_no_updates_half_life.as_secs_f64(); + if half_life != 0.0 { + liquidity.liquidity_history.decay_buckets(elapsed_time.as_secs_f64() / half_life); + liquidity.offset_history_last_updated = duration_since_epoch; + } + } + liquidity.min_liquidity_offset_msat != 0 || liquidity.max_liquidity_offset_msat != 0 || + liquidity.liquidity_history.has_datapoints() + }); + } + + fn get(&self, short_channel_id: &u64) -> Option<&ChannelLiquidity> { + self.0.get(short_channel_id) + } + + fn insert(&mut self, short_channel_id: u64, liquidity: ChannelLiquidity) -> Option { + self.0.insert(short_channel_id, liquidity) + } + + fn iter(&self) -> impl Iterator { + self.0.iter() + } + + fn entry(&mut self, short_channel_id: u64) -> Entry { + self.0.entry(short_channel_id) + } + + #[cfg(test)] + fn get_mut(&mut self, short_channel_id: &u64) -> Option<&mut ChannelLiquidity> { + self.0.get_mut(short_channel_id) + } +} + +impl Readable for ChannelLiquidities { + #[inline] + fn read(r: &mut R) -> Result { + let mut channel_liquidities = new_hash_map(); + read_tlv_fields!(r, { + (0, channel_liquidities, required), + }); + Ok(ChannelLiquidities(channel_liquidities)) + } +} + +impl Writeable for ChannelLiquidities { + #[inline] + fn write(&self, w: &mut W) -> Result<(), io::Error> { + write_tlv_fields!(w, { + (0, self.0, required), + }); + Ok(()) + } } /// Parameters for configuring [`ProbabilisticScorer`]. @@ -849,7 +929,7 @@ impl>, L: Deref> ProbabilisticScorer whe decay_params, network_graph, logger, - channel_liquidities: new_hash_map(), + channel_liquidities: ChannelLiquidities::new(), } } @@ -1603,26 +1683,7 @@ impl>, L: Deref> ScoreUpdate for Probabilistic } fn time_passed(&mut self, duration_since_epoch: Duration) { - let decay_params = self.decay_params; - self.channel_liquidities.retain(|_scid, liquidity| { - liquidity.min_liquidity_offset_msat = - liquidity.decayed_offset(liquidity.min_liquidity_offset_msat, duration_since_epoch, decay_params); - liquidity.max_liquidity_offset_msat = - liquidity.decayed_offset(liquidity.max_liquidity_offset_msat, duration_since_epoch, decay_params); - liquidity.last_updated = duration_since_epoch; - - let elapsed_time = - duration_since_epoch.saturating_sub(liquidity.offset_history_last_updated); - if elapsed_time > decay_params.historical_no_updates_half_life { - let half_life = decay_params.historical_no_updates_half_life.as_secs_f64(); - if half_life != 0.0 { - liquidity.liquidity_history.decay_buckets(elapsed_time.as_secs_f64() / half_life); - liquidity.offset_history_last_updated = duration_since_epoch; - } - } - liquidity.min_liquidity_offset_msat != 0 || liquidity.max_liquidity_offset_msat != 0 || - liquidity.liquidity_history.has_datapoints() - }); + self.channel_liquidities.time_passed(duration_since_epoch, self.decay_params); } } @@ -2060,15 +2121,11 @@ mod bucketed_history { } } } -use bucketed_history::{LegacyHistoricalBucketRangeTracker, HistoricalBucketRangeTracker, DirectedHistoricalLiquidityTracker, HistoricalLiquidityTracker}; impl>, L: Deref> Writeable for ProbabilisticScorer where L::Target: Logger { #[inline] fn write(&self, w: &mut W) -> Result<(), io::Error> { - write_tlv_fields!(w, { - (0, self.channel_liquidities, required), - }); - Ok(()) + self.channel_liquidities.write(w) } } @@ -2079,10 +2136,7 @@ ReadableArgs<(ProbabilisticScoringDecayParameters, G, L)> for ProbabilisticScore r: &mut R, args: (ProbabilisticScoringDecayParameters, G, L) ) -> Result { let (decay_params, network_graph, logger) = args; - let mut channel_liquidities = new_hash_map(); - read_tlv_fields!(r, { - (0, channel_liquidities, required), - }); + let channel_liquidities = ChannelLiquidities::read(r)?; Ok(Self { decay_params, network_graph, From 311a083f673cebe5af76b584b02071909234d9a0 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 29 Jan 2025 17:14:05 +0100 Subject: [PATCH 2/5] add combined scorer Add a new scorer that is able to combine local score with scores coming in from an external source. This allows light nodes with a limited view on the network to improve payment success rates. --- lightning/src/routing/scoring.rs | 251 ++++++++++++++++++++++++++++++- 1 file changed, 249 insertions(+), 2 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 60ca5ccc054..c15f846b6e0 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -478,6 +478,7 @@ where L::Target: Logger { channel_liquidities: ChannelLiquidities, } /// Container for live and historical liquidity bounds for each channel. +#[derive(Clone)] pub struct ChannelLiquidities(HashMap); impl ChannelLiquidities { @@ -884,6 +885,7 @@ impl ProbabilisticScoringDecayParameters { /// first node in the ordering of the channel's counterparties. Thus, swapping the two liquidity /// offset fields gives the opposite direction. #[repr(C)] // Force the fields in memory to be in the order we specify +#[derive(Clone)] struct ChannelLiquidity { /// Lower channel liquidity bound in terms of an offset from zero. min_liquidity_offset_msat: u64, @@ -1154,6 +1156,15 @@ impl ChannelLiquidity { } } + fn merge(&mut self, other: &Self) { + // Take average for min/max liquidity offsets. + self.min_liquidity_offset_msat = (self.min_liquidity_offset_msat + other.min_liquidity_offset_msat) / 2; + self.max_liquidity_offset_msat = (self.max_liquidity_offset_msat + other.max_liquidity_offset_msat) / 2; + + // Merge historical liquidity data. + self.liquidity_history.merge(&other.liquidity_history); + } + /// Returns a view of the channel liquidity directed from `source` to `target` assuming /// `capacity_msat`. fn as_directed( @@ -1687,6 +1698,99 @@ impl>, L: Deref> ScoreUpdate for Probabilistic } } +/// A probabilistic scorer that combines local and external information to score channels. This scorer is +/// shadow-tracking local only scores, so that it becomes possible to cleanly merge external scores when they become +/// available. +/// +/// This is useful for nodes that have a limited local view of the network and need to augment their view with scores +/// from an external source to improve payment reliability. The external source may use something like background +/// probing to gather a more complete view of the network. Merging reduces the likelihood of losing unique local data on +/// particular channels. +/// +/// Note that only the locally acquired data is persisted. After a restart, the external scores will be lost and must be +/// resupplied. +pub struct CombinedScorer>, L: Deref> where L::Target: Logger { + local_only_scorer: ProbabilisticScorer, + scorer: ProbabilisticScorer, +} + +impl> + Clone, L: Deref + Clone> CombinedScorer where L::Target: Logger { + /// Create a new combined scorer with the given local scorer. + pub fn new(local_scorer: ProbabilisticScorer) -> Self { + let decay_params = local_scorer.decay_params; + let network_graph = local_scorer.network_graph.clone(); + let logger = local_scorer.logger.clone(); + let mut scorer = ProbabilisticScorer::new(decay_params, network_graph, logger); + + scorer.channel_liquidities = local_scorer.channel_liquidities.clone(); + + Self { + local_only_scorer: local_scorer, + scorer: scorer, + } + } + + /// Merge external channel liquidity information into the scorer. + pub fn merge(&mut self, mut external_scores: ChannelLiquidities, duration_since_epoch: Duration) { + // Decay both sets of scores to make them comparable and mergeable. + self.local_only_scorer.time_passed(duration_since_epoch); + external_scores.time_passed(duration_since_epoch, self.local_only_scorer.decay_params); + + let local_scores = &self.local_only_scorer.channel_liquidities; + + // For each channel, merge the external liquidity information with the isolated local liquidity information. + for (scid, mut liquidity) in external_scores.0 { + if let Some(local_liquidity) = local_scores.get(&scid) { + liquidity.merge(local_liquidity); + } + self.scorer.channel_liquidities.insert(scid, liquidity); + } + } +} + +impl>, L: Deref> ScoreLookUp for CombinedScorer where L::Target: Logger { + type ScoreParams = ProbabilisticScoringFeeParameters; + + fn channel_penalty_msat( + &self, candidate: &CandidateRouteHop, usage: ChannelUsage, score_params: &ProbabilisticScoringFeeParameters + ) -> u64 { + self.scorer.channel_penalty_msat(candidate, usage, score_params) + } +} + +impl>, L: Deref> ScoreUpdate for CombinedScorer where L::Target: Logger { + fn payment_path_failed(&mut self,path: &Path,short_channel_id:u64,duration_since_epoch:Duration) { + self.local_only_scorer.payment_path_failed(path, short_channel_id, duration_since_epoch); + self.scorer.payment_path_failed(path, short_channel_id, duration_since_epoch); + } + + fn payment_path_successful(&mut self,path: &Path,duration_since_epoch:Duration) { + self.local_only_scorer.payment_path_successful(path, duration_since_epoch); + self.scorer.payment_path_successful(path, duration_since_epoch); + } + + fn probe_failed(&mut self,path: &Path,short_channel_id:u64,duration_since_epoch:Duration) { + self.local_only_scorer.probe_failed(path, short_channel_id, duration_since_epoch); + self.scorer.probe_failed(path, short_channel_id, duration_since_epoch); + } + + fn probe_successful(&mut self,path: &Path,duration_since_epoch:Duration) { + self.local_only_scorer.probe_successful(path, duration_since_epoch); + self.scorer.probe_successful(path, duration_since_epoch); + } + + fn time_passed(&mut self,duration_since_epoch:Duration) { + self.local_only_scorer.time_passed(duration_since_epoch); + self.scorer.time_passed(duration_since_epoch); + } +} + +impl>, L: Deref> Writeable for CombinedScorer where L::Target: Logger { + fn write(&self, writer: &mut W) -> Result<(), crate::io::Error> { + self.local_only_scorer.write(writer) + } +} + #[cfg(c_bindings)] impl>, L: Deref> Score for ProbabilisticScorer where L::Target: Logger {} @@ -1866,6 +1970,13 @@ mod bucketed_history { self.buckets[bucket] = self.buckets[bucket].saturating_add(BUCKET_FIXED_POINT_ONE); } } + + /// Returns the average of the buckets between the two trackers. + pub(crate) fn merge(&mut self, other: &Self) -> () { + for (bucket, other_bucket) in self.buckets.iter_mut().zip(other.buckets.iter()) { + *bucket = ((*bucket as u32 + *other_bucket as u32) / 2) as u16; + } + } } impl_writeable_tlv_based!(HistoricalBucketRangeTracker, { (0, buckets, required) }); @@ -1962,6 +2073,13 @@ mod bucketed_history { -> DirectedHistoricalLiquidityTracker<&'a mut HistoricalLiquidityTracker> { DirectedHistoricalLiquidityTracker { source_less_than_target, tracker: self } } + + /// Merges the historical liquidity data from another tracker into this one. + pub fn merge(&mut self, other: &Self) { + self.min_liquidity_offset_history.merge(&other.min_liquidity_offset_history); + self.max_liquidity_offset_history.merge(&other.max_liquidity_offset_history); + self.recalculate_valid_point_count(); + } } /// A set of buckets representing the history of where we've seen the minimum- and maximum- @@ -2120,6 +2238,72 @@ mod bucketed_history { Some((cumulative_success_prob * (1024.0 * 1024.0 * 1024.0)) as u64) } } + + #[cfg(test)] + mod tests { + use crate::routing::scoring::ProbabilisticScoringFeeParameters; + + use super::{HistoricalBucketRangeTracker, HistoricalLiquidityTracker}; + #[test] + fn historical_liquidity_bucket_merge() { + let mut bucket1 = HistoricalBucketRangeTracker::new(); + bucket1.track_datapoint(100, 1000); + assert_eq!( + bucket1.buckets, + [ + 0u16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0 + ] + ); + + let mut bucket2 = HistoricalBucketRangeTracker::new(); + bucket2.track_datapoint(0, 1000); + assert_eq!( + bucket2.buckets, + [ + 32u16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0 + ] + ); + + bucket1.merge(&bucket2); + assert_eq!( + bucket1.buckets, + [ + 16u16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0 + ] + ); + } + + #[test] + fn historical_liquidity_tracker_merge() { + let params = ProbabilisticScoringFeeParameters::default(); + + let probability1: Option; + let mut tracker1 = HistoricalLiquidityTracker::new(); + { + let mut directed_tracker1 = tracker1.as_directed_mut(true); + directed_tracker1.track_datapoint(100, 200, 1000); + probability1 = directed_tracker1 + .calculate_success_probability_times_billion(¶ms, 500, 1000); + } + + let mut tracker2 = HistoricalLiquidityTracker::new(); + { + let mut directed_tracker2 = tracker2.as_directed_mut(true); + directed_tracker2.track_datapoint(200, 300, 1000); + } + + tracker1.merge(&tracker2); + + let directed_tracker1 = tracker1.as_directed(true); + let probability = + directed_tracker1.calculate_success_probability_times_billion(¶ms, 500, 1000); + + assert_ne!(probability1, probability); + } + } } impl>, L: Deref> Writeable for ProbabilisticScorer where L::Target: Logger { @@ -2213,7 +2397,7 @@ impl Readable for ChannelLiquidity { #[cfg(test)] mod tests { - use super::{ChannelLiquidity, HistoricalLiquidityTracker, ProbabilisticScoringFeeParameters, ProbabilisticScoringDecayParameters, ProbabilisticScorer}; + use super::{ChannelLiquidity, HistoricalLiquidityTracker, ProbabilisticScorer, ProbabilisticScoringDecayParameters, ProbabilisticScoringFeeParameters}; use crate::blinded_path::BlindedHop; use crate::util::config::UserConfig; @@ -2221,7 +2405,7 @@ mod tests { use crate::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate}; use crate::routing::gossip::{EffectiveCapacity, NetworkGraph, NodeId}; use crate::routing::router::{BlindedTail, Path, RouteHop, CandidateRouteHop, PublicHopCandidate}; - use crate::routing::scoring::{ChannelUsage, ScoreLookUp, ScoreUpdate}; + use crate::routing::scoring::{ChannelLiquidities, ChannelUsage, CombinedScorer, ScoreLookUp, ScoreUpdate}; use crate::util::ser::{ReadableArgs, Writeable}; use crate::util::test_utils::{self, TestLogger}; @@ -2231,6 +2415,7 @@ mod tests { use bitcoin::network::Network; use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey}; use core::time::Duration; + use std::rc::Rc; use crate::io; fn source_privkey() -> SecretKey { @@ -3722,6 +3907,68 @@ mod tests { assert_eq!(scorer.historical_estimated_payment_success_probability(42, &target, amount_msat, ¶ms, false), Some(0.0)); } + + #[test] + fn combined_scorer() { + let logger = TestLogger::new(); + let network_graph = network_graph(&logger); + let params = ProbabilisticScoringFeeParameters::default(); + let mut scorer = ProbabilisticScorer::new( + ProbabilisticScoringDecayParameters::default(), + &network_graph, + &logger, + ); + scorer.payment_path_failed(&payment_path_for_amount(600), 42, Duration::ZERO); + + let mut combined_scorer = CombinedScorer::new(scorer); + + // Verify that the combined_scorer has the correct liquidity range after a failed 600 msat payment. + let liquidity_range = + combined_scorer.scorer.estimated_channel_liquidity_range(42, &target_node_id()); + assert_eq!(liquidity_range.unwrap(), (0, 600)); + + let source = source_node_id(); + let usage = ChannelUsage { + amount_msat: 750, + inflight_htlc_msat: 0, + effective_capacity: EffectiveCapacity::Total { + capacity_msat: 1_000, + htlc_maximum_msat: 1_000, + }, + }; + + { + let network_graph = network_graph.read_only(); + let channel = network_graph.channel(42).unwrap(); + let (info, _) = channel.as_directed_from(&source).unwrap(); + let candidate = + CandidateRouteHop::PublicHop(PublicHopCandidate { info, short_channel_id: 42 }); + + let penalty = combined_scorer.channel_penalty_msat(&candidate, usage, ¶ms); + + let mut external_liquidity = ChannelLiquidity::new(Duration::ZERO); + let logger_rc = Rc::new(&logger); // Why necessary and not above for the network graph? + external_liquidity + .as_directed_mut(&source_node_id(), &target_node_id(), 1_000) + .successful(1000, Duration::ZERO, format_args!("test channel"), logger_rc.as_ref()); + + let mut external_scores = ChannelLiquidities::new(); + + external_scores.insert(42, external_liquidity); + combined_scorer.merge(external_scores, Duration::ZERO); + + let penalty_after_merge = + combined_scorer.channel_penalty_msat(&candidate, usage, ¶ms); + + // Since the external source observed a successful payment, the penalty should be lower after the merge. + assert!(penalty_after_merge < penalty); + } + + // Verify that after the merge with a successful payment, the liquidity range is increased. + let liquidity_range = + combined_scorer.scorer.estimated_channel_liquidity_range(42, &target_node_id()); + assert_eq!(liquidity_range.unwrap(), (0, 300)); + } } #[cfg(ldk_bench)] From bb468dd7ed4a19047fefa172cbbab28dde00327c Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 3 Feb 2025 09:46:20 +0100 Subject: [PATCH 3/5] fix historical liquidity bucket decay The formula for applying half lives was incorrect. Test coverage added. --- lightning/src/routing/scoring.rs | 41 +++++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index c15f846b6e0..eb784b12e2b 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -1977,13 +1977,21 @@ mod bucketed_history { *bucket = ((*bucket as u32 + *other_bucket as u32) / 2) as u16; } } + + /// Applies decay at the given half-life to all buckets. + fn decay(&mut self, half_lives: f64) { + let factor = (1024.0 * powf64(0.5, half_lives)) as u64; + for bucket in self.buckets.iter_mut() { + *bucket = ((*bucket as u64) * factor / 1024) as u16; + } + } } impl_writeable_tlv_based!(HistoricalBucketRangeTracker, { (0, buckets, required) }); impl_writeable_tlv_based!(LegacyHistoricalBucketRangeTracker, { (0, buckets, required) }); #[derive(Clone, Copy)] - #[repr(C)] // Force the fields in memory to be in the order we specify. + #[repr(C)]// Force the fields in memory to be in the order we specify. pub(super) struct HistoricalLiquidityTracker { // This struct sits inside a `(u64, ChannelLiquidity)` in memory, and we first read the // liquidity offsets in `ChannelLiquidity` when calculating the non-historical score. This @@ -2031,13 +2039,8 @@ mod bucketed_history { } pub(super) fn decay_buckets(&mut self, half_lives: f64) { - let divisor = powf64(2048.0, half_lives) as u64; - for bucket in self.min_liquidity_offset_history.buckets.iter_mut() { - *bucket = ((*bucket as u64) * 1024 / divisor) as u16; - } - for bucket in self.max_liquidity_offset_history.buckets.iter_mut() { - *bucket = ((*bucket as u64) * 1024 / divisor) as u16; - } + self.min_liquidity_offset_history.decay(half_lives); + self.max_liquidity_offset_history.decay(half_lives); self.recalculate_valid_point_count(); } @@ -2276,6 +2279,28 @@ mod bucketed_history { ); } + #[test] + fn historical_liquidity_bucket_decay() { + let mut bucket = HistoricalBucketRangeTracker::new(); + bucket.track_datapoint(100, 1000); + assert_eq!( + bucket.buckets, + [ + 0u16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0 + ] + ); + + bucket.decay(2.0); + assert_eq!( + bucket.buckets, + [ + 0u16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0 + ] + ); + } + #[test] fn historical_liquidity_tracker_merge() { let params = ProbabilisticScoringFeeParameters::default(); From 630246b13f5eece87dd2496df483a3c91aa7f16b Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 6 Feb 2025 15:48:32 +0100 Subject: [PATCH 4/5] add set_scores method on CombinedScorer to overwrite local data This commit expands on the previously introduced merge method by offering a way to simply replace the local scores by the liquidity information that is obtained from an external source. --- lightning/src/routing/scoring.rs | 43 ++++++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 11 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index eb784b12e2b..9be66aebb76 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -59,7 +59,7 @@ use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use crate::util::logger::Logger; use crate::prelude::*; use crate::prelude::hash_map::Entry; -use core::{cmp, fmt}; +use core::{cmp, fmt, mem}; use core::ops::{Deref, DerefMut}; use core::time::Duration; use crate::io::{self, Read}; @@ -1143,6 +1143,11 @@ impl>, L: Deref> ProbabilisticScorer whe } None } + + /// Overwrite the scorer state with the given external scores. + pub fn set_scores(&mut self, external_scores: ChannelLiquidities) { + _ = mem::replace(&mut self.channel_liquidities, external_scores); + } } impl ChannelLiquidity { @@ -1746,6 +1751,11 @@ impl> + Clone, L: Deref + Clone> CombinedScore self.scorer.channel_liquidities.insert(scid, liquidity); } } + + /// Overwrite the scorer state with the given external scores. + pub fn set_scores(&mut self, external_scores: ChannelLiquidities) { + self.scorer.set_scores(external_scores); + } } impl>, L: Deref> ScoreLookUp for CombinedScorer where L::Target: Logger { @@ -3962,6 +3972,19 @@ mod tests { }, }; + let logger_rc = Rc::new(&logger); + + let mut external_liquidity = ChannelLiquidity::new(Duration::ZERO); + external_liquidity.as_directed_mut(&source_node_id(), &target_node_id(), 1_000).successful( + 1000, + Duration::ZERO, + format_args!("test channel"), + logger_rc.as_ref(), + ); + + let mut external_scores = ChannelLiquidities::new(); + external_scores.insert(42, external_liquidity); + { let network_graph = network_graph.read_only(); let channel = network_graph.channel(42).unwrap(); @@ -3971,16 +3994,7 @@ mod tests { let penalty = combined_scorer.channel_penalty_msat(&candidate, usage, ¶ms); - let mut external_liquidity = ChannelLiquidity::new(Duration::ZERO); - let logger_rc = Rc::new(&logger); // Why necessary and not above for the network graph? - external_liquidity - .as_directed_mut(&source_node_id(), &target_node_id(), 1_000) - .successful(1000, Duration::ZERO, format_args!("test channel"), logger_rc.as_ref()); - - let mut external_scores = ChannelLiquidities::new(); - - external_scores.insert(42, external_liquidity); - combined_scorer.merge(external_scores, Duration::ZERO); + combined_scorer.merge(external_scores.clone(), Duration::ZERO); let penalty_after_merge = combined_scorer.channel_penalty_msat(&candidate, usage, ¶ms); @@ -3993,6 +4007,13 @@ mod tests { let liquidity_range = combined_scorer.scorer.estimated_channel_liquidity_range(42, &target_node_id()); assert_eq!(liquidity_range.unwrap(), (0, 300)); + + // Now set (overwrite) the scorer state with the external data which should lead to an even greater liquidity + // range. Just the success from the external source is now considered. + combined_scorer.set_scores(external_scores); + let liquidity_range = + combined_scorer.scorer.estimated_channel_liquidity_range(42, &target_node_id()); + assert_eq!(liquidity_range.unwrap(), (0, 0)); } } From e9921ddb016dba1c6ce0b371e4ced7faf4956b62 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 10 Feb 2025 12:47:34 +0100 Subject: [PATCH 5/5] add scores getter on ProbabilisticScorer Allows access to the scorer state. An example use case is an LSP exposing the global network view in its scorer over http to light clients. --- lightning/src/routing/scoring.rs | 36 ++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 9be66aebb76..bbdd7528465 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -1148,6 +1148,11 @@ impl>, L: Deref> ProbabilisticScorer whe pub fn set_scores(&mut self, external_scores: ChannelLiquidities) { _ = mem::replace(&mut self.channel_liquidities, external_scores); } + + /// Returns the current scores. + pub fn scores(&self) -> &ChannelLiquidities { + &self.channel_liquidities + } } impl ChannelLiquidity { @@ -3943,6 +3948,37 @@ mod tests { Some(0.0)); } + #[test] + fn get_scores() { + let logger = TestLogger::new(); + let network_graph = network_graph(&logger); + let params = ProbabilisticScoringFeeParameters { + liquidity_penalty_multiplier_msat: 1_000, + ..ProbabilisticScoringFeeParameters::zero_penalty() + }; + let mut scorer = ProbabilisticScorer::new(ProbabilisticScoringDecayParameters::default(), &network_graph, &logger); + let source = source_node_id(); + let usage = ChannelUsage { + amount_msat: 500, + inflight_htlc_msat: 0, + effective_capacity: EffectiveCapacity::Total { capacity_msat: 1_000, htlc_maximum_msat: 1_000 }, + }; + let successful_path = payment_path_for_amount(200); + let channel = &network_graph.read_only().channel(42).unwrap().to_owned(); + let (info, _) = channel.as_directed_from(&source).unwrap(); + let candidate = CandidateRouteHop::PublicHop(PublicHopCandidate { + info, + short_channel_id: 41, + }); + + scorer.payment_path_successful(&successful_path, Duration::ZERO); + assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 301); + + // Get the scores and assert that both channels are present in the returned struct. + let scores = scorer.scores(); + assert_eq!(scores.iter().count(), 2); + } + #[test] fn combined_scorer() { let logger = TestLogger::new();