-
Notifications
You must be signed in to change notification settings - Fork 417
Async FilesystemStore #3931
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Async FilesystemStore #3931
Conversation
👋 Thanks for assigning @TheBlueMatt as a reviewer! |
29b8bcf
to
81ad668
Compare
let this = Arc::clone(&self.inner); | ||
|
||
Box::pin(async move { | ||
tokio::task::spawn_blocking(move || { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mhh, so I'm not sure if spawning blocking tasks for every IO call is the way to go (see for example https://docs.rs/tokio/latest/tokio/fs/index.html#tuning-your-file-io: "To get good performance with file IO on Tokio, it is recommended to batch your operations into as few spawn_blocking calls as possible."). Maybe there are other designs that we should at least consider before moving forward with this approach. For example, we could create a dedicated pool of longer-lived worker task(s) that process a queue?
If we use spawn_blocking
, can we give the user control over which runtime this exactly will be spawned on? Also, rather than just doing wrapping, should we be using tokio::fs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mhh, so I'm not sure if spawning blocking tasks for every IO call is the way to go (see for example https://docs.rs/tokio/latest/tokio/fs/index.html#tuning-your-file-io: "To get good performance with file IO on Tokio, it is recommended to batch your operations into as few spawn_blocking calls as possible.").
If we should batch operations, I think the current approach is better than using tokio::fs? Because it already batches the various operations inside kvstoresync::write.
Further batching probably needs to happen at a higher level in LDK, and might be a bigger change. Not sure if that is worth it just for FIlesystemStore, especially when that store is not the preferred store for real world usage?
For example, we could create a dedicated pool of longer-lived worker task(s) that process a queue?
Isn't Tokio doing that already when a task is spawned?
If we use spawn_blocking, can we give the user control over which runtime this exactly will be spawned on? Also, rather than just doing wrapping, should we be using tokio::fs?
With tokio::fs, the current runtime is used. I'd think that that is then also sufficient if we spawn ourselves, without a need to specifiy which runtime exactly?
More generally, I think the main purpose of this PR is to show how an async kvstore could be implemented, and to have something for testing potentially. Additionally if there are users that really want to use this type of store in production, they could. But I don't think it is something to spend too much time on. A remote database is probably the more important target to design for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With tokio::fs, the current runtime is used. I'd think that that is then also sufficient if we spawn ourselves, without a need to specifiy which runtime exactly?
Hmm, I'm not entirely sure, especially for users that have multiple runtime contexts floating around, it might be important to make sure the store uses a particular one (cc @domZippilli ?). I'll also have to think through this for LDK Node when we make the switch to async KVStore there, but happy to leave as-is for now.
lightning/src/util/persist.rs
Outdated
} | ||
|
||
/// Provides additional interface methods that are required for [`KVStore`]-to-[`KVStore`] | ||
/// data migration. | ||
pub trait MigratableKVStore: KVStore { | ||
pub trait MigratableKVStore: KVStoreSync { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How will we solve this for an KVStore
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this comment belongs in #3905?
We might not need to solve it now, as long as we still require a sync implementation alongside an async one? If we support async-only kvstores, then we can create an async version of this trait?
81ad668
to
e462bce
Compare
Removed garbage collector, because we need to keep the last written version. |
97d6b3f
to
02dce94
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3931 +/- ##
========================================
Coverage 88.94% 88.94%
========================================
Files 174 174
Lines 124201 124353 +152
Branches 124201 124353 +152
========================================
+ Hits 110472 110610 +138
- Misses 11251 11262 +11
- Partials 2478 2481 +3
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
c061fcd
to
2492508
Compare
9938dfe
to
7d98528
Compare
38ab949
to
dd9e1b5
Compare
Updated code to not use an async wrapper, but conditionally expose the async I didn't yet update the |
🔔 1st Reminder Hey @tnull! This PR has been waiting for your review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approach ACK I think, but I find it really concerning that we'd double our memory footprint while writing here. I don't think that is really acceptable if the async KVStore
is going to be our default. I think we'll need to find a solution before moving on.
This also needs a rebase now that #3799 landed.
lightning-persister/src/fs_store.rs
Outdated
let this = Arc::clone(&self.inner); | ||
|
||
// Obtain a version number to retain the call sequence. | ||
let version = self.version_counter.fetch_add(1, Ordering::SeqCst); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very, very unlikely, but should we still panic if we'd ever hit u64::MAX
to avoid we'd ever silently start skipping all writes? Also, I'm not sure we really need SeqCst
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Relaxed
and added overflow panic.
lightning-persister/src/fs_store.rs
Outdated
let primary_namespace = primary_namespace.to_string(); | ||
let secondary_namespace = secondary_namespace.to_string(); | ||
let key = key.to_string(); | ||
let buf = buf.to_vec(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ugh, all these reallocations are not great, in particular not for buf
, which we then give as a reference to write_version
. So we'll then hold the full encoded data in memory at least twice. If we don't find a way to deal with the references, I wonder if we should switch the async version of KVStore
to take owned primary_namespace
, secondary_namespace
, key
, buf
args.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the problem is that those args should then have static lifetime, because it isn't clear when they will be used by the runtime? Open to suggestions how to do that.
Agreed that copy is not ideal. Not sure if it actually matters if we also need to hit the disk with the same data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we just pass a Vec
in to the (async) KVStore API? That's basically what we have at all the callsites, and now that we have a real use for it on the other end we should change the API, IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we just pass a
Vec
in to the (async) KVStore API? That's basically what we have at all the callsites, and now that we have a real use for it on the other end we should change the API, IMO.
Right, that's why I meant with 'owned args' above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it now.
👋 The first review has been submitted! Do you think this PR is ready for a second reviewer? If so, click here to assign a second reviewer. |
dd9e1b5
to
05a4bb4
Compare
Rebased. Only diff is the counter overflow change |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is blocked on #3974 now.
c824b41
to
460c716
Compare
let version = self.version_counter.fetch_add(1, Ordering::Relaxed); | ||
if version == u64::MAX { | ||
panic!("FilesystemStore version counter overflowed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The overflow check occurs after fetch_add
has already executed, which means if version_counter
was at u64::MAX
, it would wrap to 0 before the check detects the overflow. Consider either:
- Using
checked_add
before incrementing:
let current = self.version_counter.load(Ordering::Relaxed);
if current == u64::MAX {
panic!("FilesystemStore version counter would overflow");
}
let version = self.version_counter.fetch_add(1, Ordering::Relaxed);
- Or using a wrapping increment with a separate check that compares the new value with the old:
let old_version = self.version_counter.fetch_add(1, Ordering::Relaxed);
if old_version > self.version_counter.load(Ordering::Relaxed) {
panic!("FilesystemStore version counter overflowed");
}
let version = old_version;
let version = self.version_counter.fetch_add(1, Ordering::Relaxed); | |
if version == u64::MAX { | |
panic!("FilesystemStore version counter overflowed"); | |
let current = self.version_counter.load(Ordering::Relaxed); | |
if current == u64::MAX { | |
panic!("FilesystemStore version counter would overflow"); | |
} | |
let version = self.version_counter.fetch_add(1, Ordering::Relaxed); |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is okay. Some thread is going to hit max and panic.
Rebased on the latest #3974. No more unnecessary copies. |
Updated ldk-node try out so that it again compiles. Need to think about what this is going to look like in the final version. A parent trait that combines Sync and Async might be useful? |
460c716
to
c8b45ea
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, mod one comment that should be addressed before we merge this.
lightning-persister/Cargo.toml
Outdated
[dependencies] | ||
bitcoin = "0.32.2" | ||
lightning = { version = "0.2.0", path = "../lightning" } | ||
tokio = { version = "1.35", optional = true, features = [ "macros", "rt-multi-thread" ] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add this with default-features = false
. Also, we need macros
only in tests, so could be added on a dev dependency only:
diff --git a/lightning-persister/Cargo.toml b/lightning-persister/Cargo.toml
index 5a515fbab..593e19a95 100644
--- a/lightning-persister/Cargo.toml
+++ b/lightning-persister/Cargo.toml
@@ -19,7 +19,7 @@ tokio = ["dep:tokio"]
[dependencies]
bitcoin = "0.32.2"
lightning = { version = "0.2.0", path = "../lightning" }
-tokio = { version = "1.35", optional = true, features = [ "macros", "rt-multi-thread" ] }
+tokio = { version = "1.35", optional = true, default-features = false, features = ["rt-multi-thread"] }
[target.'cfg(windows)'.dependencies]
windows-sys = { version = "0.48.0", default-features = false, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] }
@@ -30,6 +30,7 @@ criterion = { version = "0.4", optional = true, default-features = false }
[dev-dependencies]
lightning = { version = "0.2.0", path = "../lightning", features = ["_test_utils"] }
bitcoin = { version = "0.32.2", default-features = false }
+tokio = { version = "1.35", default-features = false, features = ["macros"] }
[lints]
workspace = true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the diff
let this = Arc::clone(&self.inner); | ||
|
||
Box::pin(async move { | ||
tokio::task::spawn_blocking(move || { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With tokio::fs, the current runtime is used. I'd think that that is then also sufficient if we spawn ourselves, without a need to specifiy which runtime exactly?
Hmm, I'm not entirely sure, especially for users that have multiple runtime contexts floating around, it might be important to make sure the store uses a particular one (cc @domZippilli ?). I'll also have to think through this for LDK Node when we make the switch to async KVStore there, but happy to leave as-is for now.
c8b45ea
to
9866309
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -30,47 +39,100 @@ fn path_to_windows_str<T: AsRef<OsStr>>(path: &T) -> Vec<u16> { | |||
path.as_ref().encode_wide().chain(Some(0)).collect() | |||
} | |||
|
|||
// The number of read/write/remove/list operations after which we clean up our `locks` HashMap. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I don't really see why its okay to drop the lock GC'ing. We'll end up slowly increasing memory over time for ChannelMonitorUpdate-writing clients without ever cleaning it up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the garbage collection because we need to keep track of the version number for each file. Is there a way to avoid that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also track the number of in-flight writes and drop the tracking when there are no writes, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can do that. But is it really a problem that we use a few bytes for each file that we have on disk?
}) | ||
} | ||
|
||
fn remove( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like we should really care about ordering for remove, no? I mean I guess we could say that the remove operation happens in the future unlike the write, which we consider to happen before the future in the method, but it seems more consistent to just have remove have ordering, how hard would it be to add?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes you are right. Remove is also a sort of write. I've added a fix up commit that uses the same version counter for writes and removes.
[dependencies] | ||
bitcoin = "0.32.2" | ||
lightning = { version = "0.2.0", path = "../lightning" } | ||
tokio = { version = "1.35", optional = true, default-features = false, features = ["rt-multi-thread"] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While its true that we're relying on a multi-threaded runtime here, just setting the feature doesn't ensure we get one (you can still create a single-threaded runtime). I'm not entirely sure what the right answer is, but we need to at least document this requirement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a problem if it would be used with a single-threaded runtime, because isn't everything then just executed synchronously?
I tried the test with #[tokio::test(flavor = "current_thread")]
and that seemed to work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I figured it might be an issue but if you tested its definitely not. We can drop the feature here, then :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't drop the feature, because spawn_blocking
is only available with rt-multi-thread
I believe.
Async filesystem store with eventually consistent writes. It is just using tokio's
spawn_blocking
, because that is whattokio::fs
would otherwise do as well. Usingtokio::fs
would make it complicated to reuse the sync code.ldk-node try out: lightningdevkit/ldk-node@main...joostjager:ldk-node:async-fsstore