Skip to content

Commit 9151340

Browse files
authored
Merge pull request #544 from tnull/2025-05-generalized-data-store
Introduce generalized `DataStore`
2 parents 5586b69 + 75aa069 commit 9151340

File tree

10 files changed

+453
-347
lines changed

10 files changed

+453
-347
lines changed

src/builder.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,23 @@ use crate::gossip::GossipSource;
1818
use crate::io::sqlite_store::SqliteStore;
1919
use crate::io::utils::{read_node_metrics, write_node_metrics};
2020
use crate::io::vss_store::VssStore;
21+
use crate::io::{
22+
self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
23+
};
2124
use crate::liquidity::{
2225
LSPS1ClientConfig, LSPS2ClientConfig, LSPS2ServiceConfig, LiquiditySourceBuilder,
2326
};
2427
use crate::logger::{log_error, log_info, LdkLogger, LogLevel, LogWriter, Logger};
2528
use crate::message_handler::NodeCustomMessageHandler;
26-
use crate::payment::store::PaymentStore;
2729
use crate::peer_store::PeerStore;
2830
use crate::tx_broadcaster::TransactionBroadcaster;
2931
use crate::types::{
3032
ChainMonitor, ChannelManager, DynStore, GossipSync, Graph, KeysManager, MessageRouter,
31-
OnionMessenger, PeerManager,
33+
OnionMessenger, PaymentStore, PeerManager,
3234
};
3335
use crate::wallet::persist::KVStoreWalletPersister;
3436
use crate::wallet::Wallet;
35-
use crate::{io, Node, NodeMetrics};
37+
use crate::{Node, NodeMetrics};
3638

3739
use lightning::chain::{chainmonitor, BestBlock, Watch};
3840
use lightning::io::Cursor;
@@ -1015,9 +1017,13 @@ fn build_with_store_internal(
10151017
let fee_estimator = Arc::new(OnchainFeeEstimator::new());
10161018

10171019
let payment_store = match io::utils::read_payments(Arc::clone(&kv_store), Arc::clone(&logger)) {
1018-
Ok(payments) => {
1019-
Arc::new(PaymentStore::new(payments, Arc::clone(&kv_store), Arc::clone(&logger)))
1020-
},
1020+
Ok(payments) => Arc::new(PaymentStore::new(
1021+
payments,
1022+
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
1023+
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
1024+
Arc::clone(&kv_store),
1025+
Arc::clone(&logger),
1026+
)),
10211027
Err(_) => {
10221028
return Err(BuildError::ReadFailed);
10231029
},

src/data_store.rs

Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
// This file is Copyright its original authors, visible in version control history.
2+
//
3+
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4+
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5+
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
6+
// accordance with one or both of these licenses.
7+
8+
use crate::logger::{log_error, LdkLogger};
9+
use crate::types::DynStore;
10+
use crate::Error;
11+
12+
use lightning::util::ser::{Readable, Writeable};
13+
14+
use std::collections::hash_map;
15+
use std::collections::HashMap;
16+
use std::ops::Deref;
17+
use std::sync::{Arc, Mutex};
18+
19+
pub(crate) trait StorableObject: Clone + Readable + Writeable {
20+
type Id: StorableObjectId;
21+
type Update: StorableObjectUpdate<Self>;
22+
23+
fn id(&self) -> Self::Id;
24+
fn update(&mut self, update: &Self::Update) -> bool;
25+
fn to_update(&self) -> Self::Update;
26+
}
27+
28+
pub(crate) trait StorableObjectId: std::hash::Hash + PartialEq + Eq {
29+
fn encode_to_hex_str(&self) -> String;
30+
}
31+
32+
pub(crate) trait StorableObjectUpdate<SO: StorableObject> {
33+
fn id(&self) -> SO::Id;
34+
}
35+
36+
#[derive(PartialEq, Eq, Debug, Clone, Copy)]
37+
pub(crate) enum DataStoreUpdateResult {
38+
Updated,
39+
Unchanged,
40+
NotFound,
41+
}
42+
43+
pub(crate) struct DataStore<SO: StorableObject, L: Deref>
44+
where
45+
L::Target: LdkLogger,
46+
{
47+
objects: Mutex<HashMap<SO::Id, SO>>,
48+
primary_namespace: String,
49+
secondary_namespace: String,
50+
kv_store: Arc<DynStore>,
51+
logger: L,
52+
}
53+
54+
impl<SO: StorableObject, L: Deref> DataStore<SO, L>
55+
where
56+
L::Target: LdkLogger,
57+
{
58+
pub(crate) fn new(
59+
objects: Vec<SO>, primary_namespace: String, secondary_namespace: String,
60+
kv_store: Arc<DynStore>, logger: L,
61+
) -> Self {
62+
let objects =
63+
Mutex::new(HashMap::from_iter(objects.into_iter().map(|obj| (obj.id(), obj))));
64+
Self { objects, primary_namespace, secondary_namespace, kv_store, logger }
65+
}
66+
67+
pub(crate) fn insert(&self, object: SO) -> Result<bool, Error> {
68+
let mut locked_objects = self.objects.lock().unwrap();
69+
70+
self.persist(&object)?;
71+
let updated = locked_objects.insert(object.id(), object).is_some();
72+
Ok(updated)
73+
}
74+
75+
pub(crate) fn insert_or_update(&self, object: SO) -> Result<bool, Error> {
76+
let mut locked_objects = self.objects.lock().unwrap();
77+
78+
let updated;
79+
match locked_objects.entry(object.id()) {
80+
hash_map::Entry::Occupied(mut e) => {
81+
let update = object.to_update();
82+
updated = e.get_mut().update(&update);
83+
if updated {
84+
self.persist(&e.get())?;
85+
}
86+
},
87+
hash_map::Entry::Vacant(e) => {
88+
e.insert(object.clone());
89+
self.persist(&object)?;
90+
updated = true;
91+
},
92+
}
93+
94+
Ok(updated)
95+
}
96+
97+
pub(crate) fn remove(&self, id: &SO::Id) -> Result<(), Error> {
98+
let removed = self.objects.lock().unwrap().remove(id).is_some();
99+
if removed {
100+
let store_key = id.encode_to_hex_str();
101+
self.kv_store
102+
.remove(&self.primary_namespace, &self.secondary_namespace, &store_key, false)
103+
.map_err(|e| {
104+
log_error!(
105+
self.logger,
106+
"Removing object data for key {}/{}/{} failed due to: {}",
107+
&self.primary_namespace,
108+
&self.secondary_namespace,
109+
store_key,
110+
e
111+
);
112+
Error::PersistenceFailed
113+
})?;
114+
}
115+
Ok(())
116+
}
117+
118+
pub(crate) fn get(&self, id: &SO::Id) -> Option<SO> {
119+
self.objects.lock().unwrap().get(id).cloned()
120+
}
121+
122+
pub(crate) fn update(&self, update: &SO::Update) -> Result<DataStoreUpdateResult, Error> {
123+
let mut locked_objects = self.objects.lock().unwrap();
124+
125+
if let Some(object) = locked_objects.get_mut(&update.id()) {
126+
let updated = object.update(update);
127+
if updated {
128+
self.persist(&object)?;
129+
Ok(DataStoreUpdateResult::Updated)
130+
} else {
131+
Ok(DataStoreUpdateResult::Unchanged)
132+
}
133+
} else {
134+
Ok(DataStoreUpdateResult::NotFound)
135+
}
136+
}
137+
138+
pub(crate) fn list_filter<F: FnMut(&&SO) -> bool>(&self, f: F) -> Vec<SO> {
139+
self.objects.lock().unwrap().values().filter(f).cloned().collect::<Vec<SO>>()
140+
}
141+
142+
fn persist(&self, object: &SO) -> Result<(), Error> {
143+
let store_key = object.id().encode_to_hex_str();
144+
let data = object.encode();
145+
self.kv_store
146+
.write(&self.primary_namespace, &self.secondary_namespace, &store_key, &data)
147+
.map_err(|e| {
148+
log_error!(
149+
self.logger,
150+
"Write for key {}/{}/{} failed due to: {}",
151+
&self.primary_namespace,
152+
&self.secondary_namespace,
153+
store_key,
154+
e
155+
);
156+
Error::PersistenceFailed
157+
})?;
158+
Ok(())
159+
}
160+
}
161+
162+
#[cfg(test)]
163+
mod tests {
164+
use lightning::impl_writeable_tlv_based;
165+
use lightning::util::test_utils::{TestLogger, TestStore};
166+
167+
use crate::hex_utils;
168+
169+
use super::*;
170+
171+
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
172+
struct TestObjectId {
173+
id: [u8; 4],
174+
}
175+
176+
impl StorableObjectId for TestObjectId {
177+
fn encode_to_hex_str(&self) -> String {
178+
hex_utils::to_string(&self.id)
179+
}
180+
}
181+
impl_writeable_tlv_based!(TestObjectId, { (0, id, required) });
182+
183+
struct TestObjectUpdate {
184+
id: TestObjectId,
185+
data: [u8; 3],
186+
}
187+
impl StorableObjectUpdate<TestObject> for TestObjectUpdate {
188+
fn id(&self) -> TestObjectId {
189+
self.id
190+
}
191+
}
192+
193+
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
194+
struct TestObject {
195+
id: TestObjectId,
196+
data: [u8; 3],
197+
}
198+
199+
impl StorableObject for TestObject {
200+
type Id = TestObjectId;
201+
type Update = TestObjectUpdate;
202+
203+
fn id(&self) -> Self::Id {
204+
self.id
205+
}
206+
207+
fn update(&mut self, update: &Self::Update) -> bool {
208+
if self.data != update.data {
209+
self.data = update.data;
210+
true
211+
} else {
212+
false
213+
}
214+
}
215+
216+
fn to_update(&self) -> Self::Update {
217+
Self::Update { id: self.id, data: self.data }
218+
}
219+
}
220+
221+
impl_writeable_tlv_based!(TestObject, {
222+
(0, id, required),
223+
(2, data, required),
224+
});
225+
226+
#[test]
227+
fn data_is_persisted() {
228+
let store: Arc<DynStore> = Arc::new(TestStore::new(false));
229+
let logger = Arc::new(TestLogger::new());
230+
let primary_namespace = "datastore_test_primary".to_string();
231+
let secondary_namespace = "datastore_test_secondary".to_string();
232+
let data_store: DataStore<TestObject, Arc<TestLogger>> = DataStore::new(
233+
Vec::new(),
234+
primary_namespace.clone(),
235+
secondary_namespace.clone(),
236+
Arc::clone(&store),
237+
logger,
238+
);
239+
240+
let id = TestObjectId { id: [42u8; 4] };
241+
assert!(data_store.get(&id).is_none());
242+
243+
let store_key = id.encode_to_hex_str();
244+
245+
// Check we start empty.
246+
assert!(store.read(&primary_namespace, &secondary_namespace, &store_key).is_err());
247+
248+
// Check we successfully store an object and return `false`
249+
let object = TestObject { id, data: [23u8; 3] };
250+
assert_eq!(Ok(false), data_store.insert(object.clone()));
251+
assert_eq!(Some(object), data_store.get(&id));
252+
assert!(store.read(&primary_namespace, &secondary_namespace, &store_key).is_ok());
253+
254+
// Test re-insertion returns `true`
255+
let mut override_object = object.clone();
256+
override_object.data = [24u8; 3];
257+
assert_eq!(Ok(true), data_store.insert(override_object));
258+
assert_eq!(Some(override_object), data_store.get(&id));
259+
260+
// Check update returns `Updated`
261+
let update = TestObjectUpdate { id, data: [25u8; 3] };
262+
assert_eq!(Ok(DataStoreUpdateResult::Updated), data_store.update(&update));
263+
assert_eq!(data_store.get(&id).unwrap().data, [25u8; 3]);
264+
265+
// Check no-op update yields `Unchanged`
266+
let update = TestObjectUpdate { id, data: [25u8; 3] };
267+
assert_eq!(Ok(DataStoreUpdateResult::Unchanged), data_store.update(&update));
268+
269+
// Check bogus update yields `NotFound`
270+
let bogus_id = TestObjectId { id: [84u8; 4] };
271+
let update = TestObjectUpdate { id: bogus_id, data: [12u8; 3] };
272+
assert_eq!(Ok(DataStoreUpdateResult::NotFound), data_store.update(&update));
273+
274+
// Check `insert_or_update` inserts unknown objects
275+
let iou_id = TestObjectId { id: [55u8; 4] };
276+
let iou_object = TestObject { id: iou_id, data: [34u8; 3] };
277+
assert_eq!(Ok(true), data_store.insert_or_update(iou_object.clone()));
278+
279+
// Check `insert_or_update` doesn't update the same object
280+
assert_eq!(Ok(false), data_store.insert_or_update(iou_object.clone()));
281+
282+
// Check `insert_or_update` updates if object changed
283+
let mut new_iou_object = iou_object;
284+
new_iou_object.data[0] += 1;
285+
assert_eq!(Ok(true), data_store.insert_or_update(new_iou_object));
286+
}
287+
}

src/event.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8-
use crate::types::{CustomTlvRecord, DynStore, Sweeper, Wallet};
8+
use crate::types::{CustomTlvRecord, DynStore, PaymentStore, Sweeper, Wallet};
99

1010
use crate::{
1111
hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore,
@@ -14,13 +14,13 @@ use crate::{
1414

1515
use crate::config::{may_announce_channel, Config};
1616
use crate::connection::ConnectionManager;
17+
use crate::data_store::DataStoreUpdateResult;
1718
use crate::fee_estimator::ConfirmationTarget;
1819
use crate::liquidity::LiquiditySource;
1920
use crate::logger::Logger;
2021

2122
use crate::payment::store::{
2223
PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus,
23-
PaymentStore, PaymentStoreUpdateResult,
2424
};
2525

2626
use crate::io::{
@@ -449,7 +449,7 @@ where
449449
output_sweeper: Arc<Sweeper>,
450450
network_graph: Arc<Graph>,
451451
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
452-
payment_store: Arc<PaymentStore<L>>,
452+
payment_store: Arc<PaymentStore>,
453453
peer_store: Arc<PeerStore<L>>,
454454
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>,
455455
logger: L,
@@ -466,7 +466,7 @@ where
466466
channel_manager: Arc<ChannelManager>, connection_manager: Arc<ConnectionManager<L>>,
467467
output_sweeper: Arc<Sweeper>, network_graph: Arc<Graph>,
468468
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
469-
payment_store: Arc<PaymentStore<L>>, peer_store: Arc<PeerStore<L>>,
469+
payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>,
470470
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>, logger: L, config: Arc<Config>,
471471
) -> Self {
472472
Self {
@@ -906,12 +906,11 @@ where
906906
};
907907

908908
match self.payment_store.update(&update) {
909-
Ok(PaymentStoreUpdateResult::Updated)
910-
| Ok(PaymentStoreUpdateResult::Unchanged) => (
909+
Ok(DataStoreUpdateResult::Updated) | Ok(DataStoreUpdateResult::Unchanged) => (
911910
// No need to do anything if the idempotent update was applied, which might
912911
// be the result of a replayed event.
913912
),
914-
Ok(PaymentStoreUpdateResult::NotFound) => {
913+
Ok(DataStoreUpdateResult::NotFound) => {
915914
log_error!(
916915
self.logger,
917916
"Claimed payment with ID {} couldn't be found in store",

0 commit comments

Comments
 (0)