Skip to content

Commit f4e8d62

Browse files
committed
f: versioned remove
1 parent 9866309 commit f4e8d62

File tree

1 file changed

+33
-10
lines changed

1 file changed

+33
-10
lines changed

lightning-persister/src/fs_store.rs

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ impl KVStoreSync for FilesystemStore {
116116
Some(key),
117117
"remove",
118118
)?;
119-
self.inner.remove(path, lazy)
119+
self.inner.remove_version(path, lazy, None)
120120
}
121121

122122
fn list(
@@ -221,11 +221,11 @@ impl FilesystemStoreInner {
221221
};
222222
let mut last_written_version = inner_lock_ref.write().unwrap();
223223

224-
// If a version is provided, we check if we already have a newer version written. This is used in async
225-
// contexts to realize eventual consistency.
224+
// If a version is provided, we check if we already have a newer version written/removed. This is used in
225+
// async contexts to realize eventual consistency.
226226
if let Some(version) = version {
227227
if version <= *last_written_version {
228-
// If the version is not greater, we don't write the file.
228+
// If the version is not greater, we don't touch the file.
229229
return Ok(());
230230
}
231231

@@ -280,7 +280,9 @@ impl FilesystemStoreInner {
280280
res
281281
}
282282

283-
fn remove(&self, dest_file_path: PathBuf, lazy: bool) -> lightning::io::Result<()> {
283+
fn remove_version(
284+
&self, dest_file_path: PathBuf, lazy: bool, version: Option<u64>,
285+
) -> lightning::io::Result<()> {
284286
if !dest_file_path.is_file() {
285287
return Ok(());
286288
}
@@ -290,7 +292,18 @@ impl FilesystemStoreInner {
290292
let mut outer_lock = self.locks.lock().unwrap();
291293
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
292294
};
293-
let _guard = inner_lock_ref.write().unwrap();
295+
let mut last_written_version = inner_lock_ref.write().unwrap();
296+
297+
// If a version is provided, we check if we already have a newer version written/removed. This is used in
298+
// async contexts to realize eventual consistency.
299+
if let Some(version) = version {
300+
if version <= *last_written_version {
301+
// If the version is not greater, we don't touch the file.
302+
return Ok(());
303+
}
304+
305+
*last_written_version = version;
306+
}
294307

295308
if lazy {
296309
// If we're lazy we just call remove and be done with it.
@@ -477,10 +490,18 @@ impl KVStore for FilesystemStore {
477490
Err(e) => return Box::pin(async move { Err(e) }),
478491
};
479492

493+
// Obtain a version number to retain the call sequence.
494+
let version = self.version_counter.fetch_add(1, Ordering::Relaxed);
495+
if version == u64::MAX {
496+
panic!("FilesystemStore version counter overflowed");
497+
}
498+
480499
Box::pin(async move {
481-
tokio::task::spawn_blocking(move || this.remove(path, lazy)).await.unwrap_or_else(|e| {
482-
Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))
483-
})
500+
tokio::task::spawn_blocking(move || this.remove_version(path, lazy, Some(version)))
501+
.await
502+
.unwrap_or_else(|e| {
503+
Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))
504+
})
484505
})
485506
}
486507

@@ -720,8 +741,10 @@ mod tests {
720741
// Test writing the same key twice with different data. Execute the asynchronous part out of order to ensure
721742
// that eventual consistency works.
722743
let fut1 = fs_store.write(primary_namespace, secondary_namespace, key, data1);
723-
let fut2 = fs_store.write(primary_namespace, secondary_namespace, key, data2.clone());
744+
let fut2 = fs_store.remove(primary_namespace, secondary_namespace, key, false);
745+
let fut3 = fs_store.write(primary_namespace, secondary_namespace, key, data2.clone());
724746

747+
fut3.await.unwrap();
725748
fut2.await.unwrap();
726749
fut1.await.unwrap();
727750

0 commit comments

Comments
 (0)