Skip to content

Commit 9241b00

Browse files
authored
Merge pull request #1847 from tursodatabase/lucio/1838
libsql: restart sync on lower frame_no from remote
2 parents 15849e4 + 6158cc2 commit 9241b00

File tree

3 files changed

+131
-12
lines changed

3 files changed

+131
-12
lines changed

libsql/src/local/database.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,29 @@ impl Database {
388388
#[cfg(feature = "sync")]
389389
/// Push WAL frames to remote.
390390
pub async fn push(&self) -> Result<crate::database::Replicated> {
391+
use crate::sync::SyncError;
392+
use crate::Error;
393+
394+
match self.try_push().await {
395+
Ok(rep) => Ok(rep),
396+
Err(Error::Sync(err)) => {
397+
// Retry the sync because we are ahead of the server and we need to push some older
398+
// frames.
399+
if let Some(SyncError::InvalidPushFrameNoLow(_, _)) =
400+
err.downcast_ref::<SyncError>()
401+
{
402+
tracing::debug!("got InvalidPushFrameNo, retrying push");
403+
self.try_push().await
404+
} else {
405+
Err(Error::Sync(err))
406+
}
407+
}
408+
Err(e) => Err(e),
409+
}
410+
}
411+
412+
#[cfg(feature = "sync")]
413+
async fn try_push(&self) -> Result<crate::database::Replicated> {
391414
let mut sync_ctx = self.sync_ctx.as_ref().unwrap().lock().await;
392415
let conn = self.connect()?;
393416

libsql/src/sync.rs

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ pub enum SyncError {
4343
VerifyVersion(u32, u32),
4444
#[error("failed to verify metadata file hash: expected={0}, got={1}")]
4545
VerifyHash(u32, u32),
46+
#[error("server returned a lower frame_no: sent={0}, got={1}")]
47+
InvalidPushFrameNoLow(u32, u32),
48+
#[error("server returned a higher frame_no: sent={0}, got={1}")]
49+
InvalidPushFrameNoHigh(u32, u32),
4650
}
4751

4852
impl SyncError {
@@ -91,7 +95,10 @@ impl SyncContext {
9195
};
9296

9397
if let Err(e) = me.read_metadata().await {
94-
tracing::error!("failed to read sync metadata file: {}", e);
98+
tracing::error!(
99+
"failed to read sync metadata file, resetting back to defaults: {}",
100+
e
101+
);
95102
}
96103

97104
Ok(me)
@@ -115,6 +122,30 @@ impl SyncContext {
115122

116123
let durable_frame_num = self.push_with_retry(uri, frame, self.max_retries).await?;
117124

125+
if durable_frame_num > frame_no {
126+
tracing::error!(
127+
"server returned durable_frame_num larger than what we sent: sent={}, got={}",
128+
frame_no,
129+
durable_frame_num
130+
);
131+
132+
return Err(SyncError::InvalidPushFrameNoHigh(frame_no, durable_frame_num).into());
133+
}
134+
135+
if durable_frame_num < frame_no {
136+
// Update our knowledge of where the server is at frame wise.
137+
self.durable_frame_num = durable_frame_num;
138+
139+
tracing::debug!(
140+
"server returned durable_frame_num lower than what we sent: sent={}, got={}",
141+
frame_no,
142+
durable_frame_num
143+
);
144+
145+
// Return an error and expect the caller to re-call push with the updated state.
146+
return Err(SyncError::InvalidPushFrameNoLow(frame_no, durable_frame_num).into());
147+
}
148+
118149
tracing::debug!(?durable_frame_num, "frame successfully pushed");
119150

120151
// Update our last known max_frame_no from the server.
@@ -232,14 +263,20 @@ impl SyncContext {
232263
return Err(SyncError::VerifyVersion(metadata.version, METADATA_VERSION).into());
233264
}
234265

266+
tracing::debug!(
267+
"read sync metadata for db_path={:?}, metadata={:?}",
268+
self.db_path,
269+
metadata
270+
);
271+
235272
self.durable_frame_num = metadata.durable_frame_num;
236273
self.generation = metadata.generation;
237274

238275
Ok(())
239276
}
240277
}
241278

242-
#[derive(serde::Serialize, serde::Deserialize)]
279+
#[derive(serde::Serialize, serde::Deserialize, Debug)]
243280
struct MetadataJson {
244281
hash: u32,
245282
version: u32,

libsql/src/sync/test.rs

Lines changed: 69 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ use std::pin::Pin;
44
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
55
use std::sync::Arc;
66
use std::task::{Context, Poll};
7+
use std::time::Duration;
78
use tempfile::tempdir;
89
use tokio::io::{duplex, AsyncRead, AsyncWrite, DuplexStream};
910
use tower::Service;
10-
use std::time::Duration;
1111

1212
#[tokio::test]
1313
async fn test_sync_context_push_frame() {
@@ -30,10 +30,10 @@ async fn test_sync_context_push_frame() {
3030
// Push a frame and verify the response
3131
let durable_frame = sync_ctx.push_one_frame(frame, 1, 0).await.unwrap();
3232
sync_ctx.write_metadata().await.unwrap();
33-
assert_eq!(durable_frame, 1); // First frame should return max_frame_no = 1
33+
assert_eq!(durable_frame, 0); // First frame should return max_frame_no = 0
3434

3535
// Verify internal state was updated
36-
assert_eq!(sync_ctx.durable_frame_num(), 1);
36+
assert_eq!(sync_ctx.durable_frame_num(), 0);
3737
assert_eq!(sync_ctx.generation(), 1);
3838
assert_eq!(server.frame_count(), 1);
3939
}
@@ -58,7 +58,7 @@ async fn test_sync_context_with_auth() {
5858

5959
let durable_frame = sync_ctx.push_one_frame(frame, 1, 0).await.unwrap();
6060
sync_ctx.write_metadata().await.unwrap();
61-
assert_eq!(durable_frame, 1);
61+
assert_eq!(durable_frame, 0);
6262
assert_eq!(server.frame_count(), 1);
6363
}
6464

@@ -84,8 +84,8 @@ async fn test_sync_context_multiple_frames() {
8484
let frame = Bytes::from(format!("frame data {}", i));
8585
let durable_frame = sync_ctx.push_one_frame(frame, 1, i).await.unwrap();
8686
sync_ctx.write_metadata().await.unwrap();
87-
assert_eq!(durable_frame, i + 1);
88-
assert_eq!(sync_ctx.durable_frame_num(), i + 1);
87+
assert_eq!(durable_frame, i);
88+
assert_eq!(sync_ctx.durable_frame_num(), i);
8989
assert_eq!(server.frame_count(), i + 1);
9090
}
9191
}
@@ -110,7 +110,7 @@ async fn test_sync_context_corrupted_metadata() {
110110
let frame = Bytes::from("test frame data");
111111
let durable_frame = sync_ctx.push_one_frame(frame, 1, 0).await.unwrap();
112112
sync_ctx.write_metadata().await.unwrap();
113-
assert_eq!(durable_frame, 1);
113+
assert_eq!(durable_frame, 0);
114114
assert_eq!(server.frame_count(), 1);
115115

116116
// Update metadata path to use -info instead of .meta
@@ -132,11 +132,69 @@ async fn test_sync_context_corrupted_metadata() {
132132
assert_eq!(sync_ctx.generation(), 1);
133133
}
134134

135+
#[tokio::test]
136+
async fn test_sync_restarts_with_lower_max_frame_no() {
137+
let _ = tracing_subscriber::fmt::try_init();
138+
139+
let server = MockServer::start();
140+
let temp_dir = tempdir().unwrap();
141+
let db_path = temp_dir.path().join("test.db");
142+
143+
// Create initial sync context and push a frame
144+
let sync_ctx = SyncContext::new(
145+
server.connector(),
146+
db_path.to_str().unwrap().to_string(),
147+
server.url(),
148+
None,
149+
)
150+
.await
151+
.unwrap();
152+
153+
let mut sync_ctx = sync_ctx;
154+
let frame = Bytes::from("test frame data");
155+
let durable_frame = sync_ctx.push_one_frame(frame.clone(), 1, 0).await.unwrap();
156+
sync_ctx.write_metadata().await.unwrap();
157+
assert_eq!(durable_frame, 0);
158+
assert_eq!(server.frame_count(), 1);
159+
160+
// Bump the durable frame num so that the next time we call the
161+
// server we think we are further ahead than the database we are talking to is.
162+
sync_ctx.durable_frame_num += 3;
163+
sync_ctx.write_metadata().await.unwrap();
164+
165+
// Create new sync context with corrupted metadata
166+
let mut sync_ctx = SyncContext::new(
167+
server.connector(),
168+
db_path.to_str().unwrap().to_string(),
169+
server.url(),
170+
None,
171+
)
172+
.await
173+
.unwrap();
174+
175+
// Verify that the context was set to new fake values.
176+
assert_eq!(sync_ctx.durable_frame_num(), 3);
177+
assert_eq!(sync_ctx.generation(), 1);
178+
179+
let frame_no = sync_ctx.durable_frame_num() + 1;
180+
// This push should fail because we are ahead of the server and thus should get an invalid
181+
// frame no error.
182+
sync_ctx
183+
.push_one_frame(frame.clone(), 1, frame_no)
184+
.await
185+
.unwrap_err();
186+
187+
let frame_no = sync_ctx.durable_frame_num() + 1;
188+
// This then should work because when the last one failed it updated our state of the server
189+
// durable_frame_num and we should then start writing from there.
190+
sync_ctx.push_one_frame(frame, 1, frame_no).await.unwrap();
191+
}
192+
135193
#[tokio::test]
136194
async fn test_sync_context_retry_on_error() {
137195
// Pause time to control it manually
138196
tokio::time::pause();
139-
197+
140198
let server = MockServer::start();
141199
let temp_dir = tempdir().unwrap();
142200
let db_path = temp_dir.path().join("test.db");
@@ -172,7 +230,7 @@ async fn test_sync_context_retry_on_error() {
172230
// Next attempt should succeed
173231
let durable_frame = sync_ctx.push_one_frame(frame, 1, 0).await.unwrap();
174232
sync_ctx.write_metadata().await.unwrap();
175-
assert_eq!(durable_frame, 1);
233+
assert_eq!(durable_frame, 0);
176234
assert_eq!(server.frame_count(), 1);
177235
}
178236

@@ -316,8 +374,9 @@ impl MockServer {
316374
let current_count = frame_count.fetch_add(1, Ordering::SeqCst);
317375

318376
if req.uri().path().contains("/sync/") {
377+
// Return the max_frame_no that has been accepted
319378
let response = serde_json::json!({
320-
"max_frame_no": current_count + 1
379+
"max_frame_no": current_count
321380
});
322381

323382
Ok::<_, hyper::Error>(

0 commit comments

Comments
 (0)