Skip to content

Commit dd9e1b5

Browse files
committed
Add async implementation of FilesystemStore
1 parent 55baa15 commit dd9e1b5

File tree

2 files changed

+200
-32
lines changed

2 files changed

+200
-32
lines changed

lightning-persister/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,13 @@ edition = "2021"
1313
all-features = true
1414
rustdoc-args = ["--cfg", "docsrs"]
1515

16+
[features]
17+
tokio = ["dep:tokio"]
18+
1619
[dependencies]
1720
bitcoin = "0.32.2"
1821
lightning = { version = "0.2.0", path = "../lightning" }
22+
tokio = { version = "1.35", optional = true, features = [ "macros", "rt-multi-thread" ] }
1923

2024
[target.'cfg(windows)'.dependencies]
2125
windows-sys = { version = "0.48.0", default-features = false, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] }

lightning-persister/src/fs_store.rs

Lines changed: 196 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,18 @@ use std::collections::HashMap;
88
use std::fs;
99
use std::io::{Read, Write};
1010
use std::path::{Path, PathBuf};
11+
#[cfg(feature = "tokio")]
12+
use std::sync::atomic::AtomicU64;
1113
use std::sync::atomic::{AtomicUsize, Ordering};
1214
use std::sync::{Arc, Mutex, RwLock};
1315

16+
#[cfg(feature = "tokio")]
17+
use core::future::Future;
18+
#[cfg(feature = "tokio")]
19+
use core::pin::Pin;
20+
#[cfg(feature = "tokio")]
21+
use lightning::util::persist::KVStore;
22+
1423
#[cfg(target_os = "windows")]
1524
use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt};
1625

@@ -30,43 +39,72 @@ fn path_to_windows_str<T: AsRef<OsStr>>(path: &T) -> Vec<u16> {
3039
path.as_ref().encode_wide().chain(Some(0)).collect()
3140
}
3241

33-
// The number of read/write/remove/list operations after which we clean up our `locks` HashMap.
34-
const GC_LOCK_INTERVAL: usize = 25;
35-
36-
/// A [`KVStoreSync`] implementation that writes to and reads from the file system.
37-
pub struct FilesystemStore {
42+
struct FilesystemStoreInner {
3843
data_dir: PathBuf,
3944
tmp_file_counter: AtomicUsize,
40-
gc_counter: AtomicUsize,
41-
locks: Mutex<HashMap<PathBuf, Arc<RwLock<()>>>>,
45+
46+
// Per path lock that ensures that we don't have concurrent writes to the same file. The lock also encapsulates the
47+
// latest written version per key.
48+
locks: Mutex<HashMap<PathBuf, Arc<RwLock<u64>>>>,
49+
}
50+
51+
/// A [`KVStore`] and [`KVStoreSync`] implementation that writes to and reads from the file system.
52+
///
53+
/// [`KVStore`]: lightning::util::persist::KVStore
54+
pub struct FilesystemStore {
55+
inner: Arc<FilesystemStoreInner>,
56+
57+
// Version counter to ensure that writes are applied in the correct order. It is assumed that read, list and remove
58+
// operations aren't sensitive to the order of execution.
59+
#[cfg(feature = "tokio")]
60+
version_counter: AtomicU64,
4261
}
4362

4463
impl FilesystemStore {
4564
/// Constructs a new [`FilesystemStore`].
4665
pub fn new(data_dir: PathBuf) -> Self {
4766
let locks = Mutex::new(HashMap::new());
4867
let tmp_file_counter = AtomicUsize::new(0);
49-
let gc_counter = AtomicUsize::new(1);
50-
Self { data_dir, tmp_file_counter, gc_counter, locks }
68+
Self {
69+
inner: Arc::new(FilesystemStoreInner { data_dir, tmp_file_counter, locks }),
70+
#[cfg(feature = "tokio")]
71+
version_counter: AtomicU64::new(0),
72+
}
5173
}
5274

5375
/// Returns the data directory.
5476
pub fn get_data_dir(&self) -> PathBuf {
55-
self.data_dir.clone()
77+
self.inner.data_dir.clone()
5678
}
79+
}
5780

58-
fn garbage_collect_locks(&self) {
59-
let gc_counter = self.gc_counter.fetch_add(1, Ordering::AcqRel);
81+
impl KVStoreSync for FilesystemStore {
82+
fn read(
83+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
84+
) -> Result<Vec<u8>, lightning::io::Error> {
85+
self.inner.read(primary_namespace, secondary_namespace, key)
86+
}
6087

61-
if gc_counter % GC_LOCK_INTERVAL == 0 {
62-
// Take outer lock for the cleanup.
63-
let mut outer_lock = self.locks.lock().unwrap();
88+
fn write(
89+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
90+
) -> Result<(), lightning::io::Error> {
91+
self.inner.write_version(primary_namespace, secondary_namespace, key, buf, None)
92+
}
6493

65-
// Garbage collect all lock entries that are not referenced anymore.
66-
outer_lock.retain(|_, v| Arc::strong_count(&v) > 1);
67-
}
94+
fn remove(
95+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
96+
) -> Result<(), lightning::io::Error> {
97+
self.inner.remove(primary_namespace, secondary_namespace, key, lazy)
98+
}
99+
100+
fn list(
101+
&self, primary_namespace: &str, secondary_namespace: &str,
102+
) -> Result<Vec<String>, lightning::io::Error> {
103+
self.inner.list(primary_namespace, secondary_namespace)
68104
}
105+
}
69106

107+
impl FilesystemStoreInner {
70108
fn get_dest_dir_path(
71109
&self, primary_namespace: &str, secondary_namespace: &str,
72110
) -> std::io::Result<PathBuf> {
@@ -90,9 +128,7 @@ impl FilesystemStore {
90128

91129
Ok(dest_dir_path)
92130
}
93-
}
94131

95-
impl KVStoreSync for FilesystemStore {
96132
fn read(
97133
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
98134
) -> lightning::io::Result<Vec<u8>> {
@@ -113,13 +149,14 @@ impl KVStoreSync for FilesystemStore {
113149
f.read_to_end(&mut buf)?;
114150
}
115151

116-
self.garbage_collect_locks();
117-
118152
Ok(buf)
119153
}
120154

121-
fn write(
155+
/// Writes a specific version of a key to the filesystem. If a newer version has been written already, this function
156+
/// returns early without writing.
157+
fn write_version(
122158
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
159+
version: Option<u64>,
123160
) -> lightning::io::Result<()> {
124161
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
125162

@@ -153,7 +190,18 @@ impl KVStoreSync for FilesystemStore {
153190
let mut outer_lock = self.locks.lock().unwrap();
154191
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
155192
};
156-
let _guard = inner_lock_ref.write().unwrap();
193+
let mut last_written_version = inner_lock_ref.write().unwrap();
194+
195+
// If a version is provided, we check if we already have a newer version written. This is used in async
196+
// contexts to realize eventual consistency.
197+
if let Some(version) = version {
198+
if version <= *last_written_version {
199+
// If the version is not greater, we don't write the file.
200+
return Ok(());
201+
}
202+
203+
*last_written_version = version;
204+
}
157205

158206
#[cfg(not(target_os = "windows"))]
159207
{
@@ -200,8 +248,6 @@ impl KVStoreSync for FilesystemStore {
200248
}
201249
};
202250

203-
self.garbage_collect_locks();
204-
205251
res
206252
}
207253

@@ -295,8 +341,6 @@ impl KVStoreSync for FilesystemStore {
295341
}
296342
}
297343

298-
self.garbage_collect_locks();
299-
300344
Ok(())
301345
}
302346

@@ -325,12 +369,90 @@ impl KVStoreSync for FilesystemStore {
325369
keys.push(key);
326370
}
327371

328-
self.garbage_collect_locks();
329-
330372
Ok(keys)
331373
}
332374
}
333375

376+
#[cfg(feature = "tokio")]
377+
impl KVStore for FilesystemStore {
378+
fn read(
379+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
380+
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, lightning::io::Error>> + 'static + Send>> {
381+
let primary_namespace = primary_namespace.to_string();
382+
let secondary_namespace = secondary_namespace.to_string();
383+
let key = key.to_string();
384+
let this = Arc::clone(&self.inner);
385+
386+
Box::pin(async move {
387+
tokio::task::spawn_blocking(move || {
388+
this.read(&primary_namespace, &secondary_namespace, &key)
389+
})
390+
.await
391+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
392+
})
393+
}
394+
395+
fn write(
396+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
397+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
398+
let primary_namespace = primary_namespace.to_string();
399+
let secondary_namespace = secondary_namespace.to_string();
400+
let key = key.to_string();
401+
let buf = buf.to_vec();
402+
let this = Arc::clone(&self.inner);
403+
404+
// Obtain a version number to retain the call sequence.
405+
let version = self.version_counter.fetch_add(1, Ordering::SeqCst);
406+
407+
Box::pin(async move {
408+
tokio::task::spawn_blocking(move || {
409+
this.write_version(
410+
&primary_namespace,
411+
&secondary_namespace,
412+
&key,
413+
&buf,
414+
Some(version),
415+
)
416+
})
417+
.await
418+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
419+
})
420+
}
421+
422+
fn remove(
423+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
424+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
425+
let primary_namespace = primary_namespace.to_string();
426+
let secondary_namespace = secondary_namespace.to_string();
427+
let key = key.to_string();
428+
let this = Arc::clone(&self.inner);
429+
430+
Box::pin(async move {
431+
tokio::task::spawn_blocking(move || {
432+
this.remove(&primary_namespace, &secondary_namespace, &key, lazy)
433+
})
434+
.await
435+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
436+
})
437+
}
438+
439+
fn list(
440+
&self, primary_namespace: &str, secondary_namespace: &str,
441+
) -> Pin<Box<dyn Future<Output = Result<Vec<String>, lightning::io::Error>> + 'static + Send>> {
442+
let primary_namespace = primary_namespace.to_string();
443+
let secondary_namespace = secondary_namespace.to_string();
444+
let this = Arc::clone(&self.inner);
445+
446+
Box::pin(async move {
447+
tokio::task::spawn_blocking(move || this.list(&primary_namespace, &secondary_namespace))
448+
.await
449+
.unwrap_or_else(|e| {
450+
Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))
451+
})
452+
})
453+
}
454+
}
455+
334456
fn dir_entry_is_key(p: &Path) -> Result<bool, lightning::io::Error> {
335457
if let Some(ext) = p.extension() {
336458
#[cfg(target_os = "windows")]
@@ -427,7 +549,7 @@ fn get_key_from_dir_entry(p: &Path, base_path: &Path) -> Result<String, lightnin
427549

428550
impl MigratableKVStore for FilesystemStore {
429551
fn list_all_keys(&self) -> Result<Vec<(String, String, String)>, lightning::io::Error> {
430-
let prefixed_dest = &self.data_dir;
552+
let prefixed_dest = &self.inner.data_dir;
431553
if !prefixed_dest.exists() {
432554
return Ok(Vec::new());
433555
}
@@ -511,7 +633,7 @@ mod tests {
511633
fn drop(&mut self) {
512634
// We test for invalid directory names, so it's OK if directory removal
513635
// fails.
514-
match fs::remove_dir_all(&self.data_dir) {
636+
match fs::remove_dir_all(&self.inner.data_dir) {
515637
Err(e) => println!("Failed to remove test persister directory: {}", e),
516638
_ => {},
517639
}
@@ -526,6 +648,48 @@ mod tests {
526648
do_read_write_remove_list_persist(&fs_store);
527649
}
528650

651+
#[cfg(feature = "tokio")]
652+
#[tokio::test]
653+
async fn read_write_remove_list_persist_async() {
654+
use crate::fs_store::FilesystemStore;
655+
use lightning::util::persist::KVStore;
656+
use std::sync::Arc;
657+
658+
let mut temp_path = std::env::temp_dir();
659+
temp_path.push("test_read_write_remove_list_persist_async");
660+
let fs_store: Arc<dyn KVStore> = Arc::new(FilesystemStore::new(temp_path));
661+
662+
let data1 = [42u8; 32];
663+
let data2 = [43u8; 32];
664+
665+
let primary_namespace = "testspace";
666+
let secondary_namespace = "testsubspace";
667+
let key = "testkey";
668+
669+
// Test writing the same key twice with different data. Execute the asynchronous part out of order to ensure
670+
// that eventual consistency works.
671+
let fut1 = fs_store.write(primary_namespace, secondary_namespace, key, &data1);
672+
let fut2 = fs_store.write(primary_namespace, secondary_namespace, key, &data2);
673+
674+
fut2.await.unwrap();
675+
fut1.await.unwrap();
676+
677+
// Test list.
678+
let listed_keys = fs_store.list(primary_namespace, secondary_namespace).await.unwrap();
679+
assert_eq!(listed_keys.len(), 1);
680+
assert_eq!(listed_keys[0], key);
681+
682+
// Test read. We expect to read data2, as the write call was initiated later.
683+
let read_data = fs_store.read(primary_namespace, secondary_namespace, key).await.unwrap();
684+
assert_eq!(data2, &*read_data);
685+
686+
// Test remove.
687+
fs_store.remove(primary_namespace, secondary_namespace, key, false).await.unwrap();
688+
689+
let listed_keys = fs_store.list(primary_namespace, secondary_namespace).await.unwrap();
690+
assert_eq!(listed_keys.len(), 0);
691+
}
692+
529693
#[test]
530694
fn test_data_migration() {
531695
let mut source_temp_path = std::env::temp_dir();

0 commit comments

Comments
 (0)