Skip to content

Commit 05a4bb4

Browse files
committed
Add async implementation of FilesystemStore
1 parent 8fd7cbf commit 05a4bb4

File tree

2 files changed

+203
-32
lines changed

2 files changed

+203
-32
lines changed

lightning-persister/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,13 @@ edition = "2021"
1313
all-features = true
1414
rustdoc-args = ["--cfg", "docsrs"]
1515

16+
[features]
17+
tokio = ["dep:tokio"]
18+
1619
[dependencies]
1720
bitcoin = "0.32.2"
1821
lightning = { version = "0.2.0", path = "../lightning" }
22+
tokio = { version = "1.35", optional = true, features = [ "macros", "rt-multi-thread" ] }
1923

2024
[target.'cfg(windows)'.dependencies]
2125
windows-sys = { version = "0.48.0", default-features = false, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] }

lightning-persister/src/fs_store.rs

Lines changed: 199 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,18 @@ use std::collections::HashMap;
88
use std::fs;
99
use std::io::{Read, Write};
1010
use std::path::{Path, PathBuf};
11+
#[cfg(feature = "tokio")]
12+
use std::sync::atomic::AtomicU64;
1113
use std::sync::atomic::{AtomicUsize, Ordering};
1214
use std::sync::{Arc, Mutex, RwLock};
1315

16+
#[cfg(feature = "tokio")]
17+
use core::future::Future;
18+
#[cfg(feature = "tokio")]
19+
use core::pin::Pin;
20+
#[cfg(feature = "tokio")]
21+
use lightning::util::persist::KVStore;
22+
1423
#[cfg(target_os = "windows")]
1524
use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt};
1625

@@ -30,47 +39,76 @@ fn path_to_windows_str<T: AsRef<OsStr>>(path: &T) -> Vec<u16> {
3039
path.as_ref().encode_wide().chain(Some(0)).collect()
3140
}
3241

33-
// The number of read/write/remove/list operations after which we clean up our `locks` HashMap.
34-
const GC_LOCK_INTERVAL: usize = 25;
35-
3642
// The number of times we retry listing keys in `FilesystemStore::list` before we give up reaching
3743
// a consistent view and error out.
3844
const LIST_DIR_CONSISTENCY_RETRIES: usize = 10;
3945

40-
/// A [`KVStoreSync`] implementation that writes to and reads from the file system.
41-
pub struct FilesystemStore {
46+
struct FilesystemStoreInner {
4247
data_dir: PathBuf,
4348
tmp_file_counter: AtomicUsize,
44-
gc_counter: AtomicUsize,
45-
locks: Mutex<HashMap<PathBuf, Arc<RwLock<()>>>>,
49+
50+
// Per path lock that ensures that we don't have concurrent writes to the same file. The lock also encapsulates the
51+
// latest written version per key.
52+
locks: Mutex<HashMap<PathBuf, Arc<RwLock<u64>>>>,
53+
}
54+
55+
/// A [`KVStore`] and [`KVStoreSync`] implementation that writes to and reads from the file system.
56+
///
57+
/// [`KVStore`]: lightning::util::persist::KVStore
58+
pub struct FilesystemStore {
59+
inner: Arc<FilesystemStoreInner>,
60+
61+
// Version counter to ensure that writes are applied in the correct order. It is assumed that read, list and remove
62+
// operations aren't sensitive to the order of execution.
63+
#[cfg(feature = "tokio")]
64+
version_counter: AtomicU64,
4665
}
4766

4867
impl FilesystemStore {
4968
/// Constructs a new [`FilesystemStore`].
5069
pub fn new(data_dir: PathBuf) -> Self {
5170
let locks = Mutex::new(HashMap::new());
5271
let tmp_file_counter = AtomicUsize::new(0);
53-
let gc_counter = AtomicUsize::new(1);
54-
Self { data_dir, tmp_file_counter, gc_counter, locks }
72+
Self {
73+
inner: Arc::new(FilesystemStoreInner { data_dir, tmp_file_counter, locks }),
74+
#[cfg(feature = "tokio")]
75+
version_counter: AtomicU64::new(0),
76+
}
5577
}
5678

5779
/// Returns the data directory.
5880
pub fn get_data_dir(&self) -> PathBuf {
59-
self.data_dir.clone()
81+
self.inner.data_dir.clone()
82+
}
83+
}
84+
85+
impl KVStoreSync for FilesystemStore {
86+
fn read(
87+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
88+
) -> Result<Vec<u8>, lightning::io::Error> {
89+
self.inner.read(primary_namespace, secondary_namespace, key)
6090
}
6191

62-
fn garbage_collect_locks(&self) {
63-
let gc_counter = self.gc_counter.fetch_add(1, Ordering::AcqRel);
92+
fn write(
93+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
94+
) -> Result<(), lightning::io::Error> {
95+
self.inner.write_version(primary_namespace, secondary_namespace, key, buf, None)
96+
}
6497

65-
if gc_counter % GC_LOCK_INTERVAL == 0 {
66-
// Take outer lock for the cleanup.
67-
let mut outer_lock = self.locks.lock().unwrap();
98+
fn remove(
99+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
100+
) -> Result<(), lightning::io::Error> {
101+
self.inner.remove(primary_namespace, secondary_namespace, key, lazy)
102+
}
68103

69-
// Garbage collect all lock entries that are not referenced anymore.
70-
outer_lock.retain(|_, v| Arc::strong_count(&v) > 1);
71-
}
104+
fn list(
105+
&self, primary_namespace: &str, secondary_namespace: &str,
106+
) -> Result<Vec<String>, lightning::io::Error> {
107+
self.inner.list(primary_namespace, secondary_namespace)
72108
}
109+
}
73110

111+
impl FilesystemStoreInner {
74112
fn get_dest_dir_path(
75113
&self, primary_namespace: &str, secondary_namespace: &str,
76114
) -> std::io::Result<PathBuf> {
@@ -94,9 +132,7 @@ impl FilesystemStore {
94132

95133
Ok(dest_dir_path)
96134
}
97-
}
98135

99-
impl KVStoreSync for FilesystemStore {
100136
fn read(
101137
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
102138
) -> lightning::io::Result<Vec<u8>> {
@@ -117,13 +153,14 @@ impl KVStoreSync for FilesystemStore {
117153
f.read_to_end(&mut buf)?;
118154
}
119155

120-
self.garbage_collect_locks();
121-
122156
Ok(buf)
123157
}
124158

125-
fn write(
159+
/// Writes a specific version of a key to the filesystem. If a newer version has been written already, this function
160+
/// returns early without writing.
161+
fn write_version(
126162
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
163+
version: Option<u64>,
127164
) -> lightning::io::Result<()> {
128165
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
129166

@@ -157,7 +194,18 @@ impl KVStoreSync for FilesystemStore {
157194
let mut outer_lock = self.locks.lock().unwrap();
158195
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
159196
};
160-
let _guard = inner_lock_ref.write().unwrap();
197+
let mut last_written_version = inner_lock_ref.write().unwrap();
198+
199+
// If a version is provided, we check if we already have a newer version written. This is used in async
200+
// contexts to realize eventual consistency.
201+
if let Some(version) = version {
202+
if version <= *last_written_version {
203+
// If the version is not greater, we don't write the file.
204+
return Ok(());
205+
}
206+
207+
*last_written_version = version;
208+
}
161209

162210
#[cfg(not(target_os = "windows"))]
163211
{
@@ -204,8 +252,6 @@ impl KVStoreSync for FilesystemStore {
204252
}
205253
};
206254

207-
self.garbage_collect_locks();
208-
209255
res
210256
}
211257

@@ -299,8 +345,6 @@ impl KVStoreSync for FilesystemStore {
299345
}
300346
}
301347

302-
self.garbage_collect_locks();
303-
304348
Ok(())
305349
}
306350

@@ -351,12 +395,93 @@ impl KVStoreSync for FilesystemStore {
351395
break 'retry_list;
352396
}
353397

354-
self.garbage_collect_locks();
355-
356398
Ok(keys)
357399
}
358400
}
359401

402+
#[cfg(feature = "tokio")]
403+
impl KVStore for FilesystemStore {
404+
fn read(
405+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
406+
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, lightning::io::Error>> + 'static + Send>> {
407+
let primary_namespace = primary_namespace.to_string();
408+
let secondary_namespace = secondary_namespace.to_string();
409+
let key = key.to_string();
410+
let this = Arc::clone(&self.inner);
411+
412+
Box::pin(async move {
413+
tokio::task::spawn_blocking(move || {
414+
this.read(&primary_namespace, &secondary_namespace, &key)
415+
})
416+
.await
417+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
418+
})
419+
}
420+
421+
fn write(
422+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
423+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
424+
let primary_namespace = primary_namespace.to_string();
425+
let secondary_namespace = secondary_namespace.to_string();
426+
let key = key.to_string();
427+
let buf = buf.to_vec();
428+
let this = Arc::clone(&self.inner);
429+
430+
// Obtain a version number to retain the call sequence.
431+
let version = self.version_counter.fetch_add(1, Ordering::Relaxed);
432+
if version == u64::MAX {
433+
panic!("FilesystemStore version counter overflowed");
434+
}
435+
436+
Box::pin(async move {
437+
tokio::task::spawn_blocking(move || {
438+
this.write_version(
439+
&primary_namespace,
440+
&secondary_namespace,
441+
&key,
442+
&buf,
443+
Some(version),
444+
)
445+
})
446+
.await
447+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
448+
})
449+
}
450+
451+
fn remove(
452+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
453+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
454+
let primary_namespace = primary_namespace.to_string();
455+
let secondary_namespace = secondary_namespace.to_string();
456+
let key = key.to_string();
457+
let this = Arc::clone(&self.inner);
458+
459+
Box::pin(async move {
460+
tokio::task::spawn_blocking(move || {
461+
this.remove(&primary_namespace, &secondary_namespace, &key, lazy)
462+
})
463+
.await
464+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
465+
})
466+
}
467+
468+
fn list(
469+
&self, primary_namespace: &str, secondary_namespace: &str,
470+
) -> Pin<Box<dyn Future<Output = Result<Vec<String>, lightning::io::Error>> + 'static + Send>> {
471+
let primary_namespace = primary_namespace.to_string();
472+
let secondary_namespace = secondary_namespace.to_string();
473+
let this = Arc::clone(&self.inner);
474+
475+
Box::pin(async move {
476+
tokio::task::spawn_blocking(move || this.list(&primary_namespace, &secondary_namespace))
477+
.await
478+
.unwrap_or_else(|e| {
479+
Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))
480+
})
481+
})
482+
}
483+
}
484+
360485
fn dir_entry_is_key(dir_entry: &fs::DirEntry) -> Result<bool, lightning::io::Error> {
361486
let p = dir_entry.path();
362487
if let Some(ext) = p.extension() {
@@ -447,7 +572,7 @@ fn get_key_from_dir_entry_path(p: &Path, base_path: &Path) -> Result<String, lig
447572

448573
impl MigratableKVStore for FilesystemStore {
449574
fn list_all_keys(&self) -> Result<Vec<(String, String, String)>, lightning::io::Error> {
450-
let prefixed_dest = &self.data_dir;
575+
let prefixed_dest = &self.inner.data_dir;
451576
if !prefixed_dest.exists() {
452577
return Ok(Vec::new());
453578
}
@@ -534,7 +659,7 @@ mod tests {
534659
fn drop(&mut self) {
535660
// We test for invalid directory names, so it's OK if directory removal
536661
// fails.
537-
match fs::remove_dir_all(&self.data_dir) {
662+
match fs::remove_dir_all(&self.inner.data_dir) {
538663
Err(e) => println!("Failed to remove test persister directory: {}", e),
539664
_ => {},
540665
}
@@ -549,6 +674,48 @@ mod tests {
549674
do_read_write_remove_list_persist(&fs_store);
550675
}
551676

677+
#[cfg(feature = "tokio")]
678+
#[tokio::test]
679+
async fn read_write_remove_list_persist_async() {
680+
use crate::fs_store::FilesystemStore;
681+
use lightning::util::persist::KVStore;
682+
use std::sync::Arc;
683+
684+
let mut temp_path = std::env::temp_dir();
685+
temp_path.push("test_read_write_remove_list_persist_async");
686+
let fs_store: Arc<dyn KVStore> = Arc::new(FilesystemStore::new(temp_path));
687+
688+
let data1 = [42u8; 32];
689+
let data2 = [43u8; 32];
690+
691+
let primary_namespace = "testspace";
692+
let secondary_namespace = "testsubspace";
693+
let key = "testkey";
694+
695+
// Test writing the same key twice with different data. Execute the asynchronous part out of order to ensure
696+
// that eventual consistency works.
697+
let fut1 = fs_store.write(primary_namespace, secondary_namespace, key, &data1);
698+
let fut2 = fs_store.write(primary_namespace, secondary_namespace, key, &data2);
699+
700+
fut2.await.unwrap();
701+
fut1.await.unwrap();
702+
703+
// Test list.
704+
let listed_keys = fs_store.list(primary_namespace, secondary_namespace).await.unwrap();
705+
assert_eq!(listed_keys.len(), 1);
706+
assert_eq!(listed_keys[0], key);
707+
708+
// Test read. We expect to read data2, as the write call was initiated later.
709+
let read_data = fs_store.read(primary_namespace, secondary_namespace, key).await.unwrap();
710+
assert_eq!(data2, &*read_data);
711+
712+
// Test remove.
713+
fs_store.remove(primary_namespace, secondary_namespace, key, false).await.unwrap();
714+
715+
let listed_keys = fs_store.list(primary_namespace, secondary_namespace).await.unwrap();
716+
assert_eq!(listed_keys.len(), 0);
717+
}
718+
552719
#[test]
553720
fn test_data_migration() {
554721
let mut source_temp_path = std::env::temp_dir();

0 commit comments

Comments
 (0)