Skip to content

Commit 6a82223

Browse files
authored
Merge pull request #1858 from tursodatabase/wal-pull
libsql: WAL pull support
2 parents 7843e86 + a2a6fc5 commit 6a82223

File tree

5 files changed

+197
-3
lines changed

5 files changed

+197
-3
lines changed
+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Example of using a offline writes with libSQL.
2+
3+
use libsql::Builder;
4+
5+
#[tokio::main]
6+
async fn main() {
7+
tracing_subscriber::fmt::init();
8+
9+
// The local database path where the data will be stored.
10+
let db_path = std::env::var("LIBSQL_DB_PATH")
11+
.map_err(|_| {
12+
eprintln!(
13+
"Please set the LIBSQL_DB_PATH environment variable to set to local database path."
14+
)
15+
})
16+
.unwrap();
17+
18+
// The remote sync URL to use.
19+
let sync_url = std::env::var("LIBSQL_SYNC_URL")
20+
.map_err(|_| {
21+
eprintln!(
22+
"Please set the LIBSQL_SYNC_URL environment variable to set to remote sync URL."
23+
)
24+
})
25+
.unwrap();
26+
27+
// The authentication token to use.
28+
let auth_token = std::env::var("LIBSQL_AUTH_TOKEN").unwrap_or("".to_string());
29+
30+
let db_builder = Builder::new_synced_database(db_path, sync_url, auth_token);
31+
32+
let db = match db_builder.build().await {
33+
Ok(db) => db,
34+
Err(error) => {
35+
eprintln!("Error connecting to remote sync server: {}", error);
36+
return;
37+
}
38+
};
39+
40+
println!("Syncing database from remote...");
41+
db.sync().await.unwrap();
42+
43+
let conn = db.connect().unwrap();
44+
let mut results = conn
45+
.query("SELECT * FROM guest_book_entries", ())
46+
.await
47+
.unwrap();
48+
println!("Guest book entries:");
49+
while let Some(row) = results.next().await.unwrap() {
50+
let text: String = row.get(0).unwrap();
51+
println!(" {}", text);
52+
}
53+
54+
println!("Done!");
55+
}

libsql/src/database.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ cfg_replication! {
373373
#[cfg(feature = "replication")]
374374
DbType::Sync { db, encryption_config: _ } => db.sync().await,
375375
#[cfg(feature = "sync")]
376-
DbType::Offline { db } => db.push().await,
376+
DbType::Offline { db } => db.sync_offline().await,
377377
_ => Err(Error::SyncNotSupported(format!("{:?}", self.db_type))),
378378
}
379379
}

libsql/src/local/connection.rs

+40
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,46 @@ impl Connection {
482482

483483
Ok(buf)
484484
}
485+
486+
pub(crate) fn wal_insert_begin(&self) -> Result<()> {
487+
let rc = unsafe { libsql_sys::ffi::libsql_wal_insert_begin(self.handle()) };
488+
if rc != 0 {
489+
return Err(crate::errors::Error::SqliteFailure(
490+
rc as std::ffi::c_int,
491+
format!("wal_insert_begin failed"),
492+
));
493+
}
494+
Ok(())
495+
}
496+
497+
pub(crate) fn wal_insert_end(&self) -> Result<()> {
498+
let rc = unsafe { libsql_sys::ffi::libsql_wal_insert_end(self.handle()) };
499+
if rc != 0 {
500+
return Err(crate::errors::Error::SqliteFailure(
501+
rc as std::ffi::c_int,
502+
format!("wal_insert_end failed"),
503+
));
504+
}
505+
Ok(())
506+
}
507+
508+
pub(crate) fn wal_insert_frame(&self, frame: &[u8]) -> Result<()> {
509+
let rc = unsafe {
510+
libsql_sys::ffi::libsql_wal_insert_frame(
511+
self.handle(),
512+
frame.len() as u32,
513+
frame.as_ptr() as *mut std::ffi::c_void,
514+
0
515+
)
516+
};
517+
if rc != 0 {
518+
return Err(crate::errors::Error::SqliteFailure(
519+
rc as std::ffi::c_int,
520+
format!("wal_insert_frame failed"),
521+
));
522+
}
523+
Ok(())
524+
}
485525
}
486526

487527
impl fmt::Debug for Connection {

libsql/src/local/database.rs

+37-2
Original file line numberDiff line numberDiff line change
@@ -386,8 +386,8 @@ impl Database {
386386
}
387387

388388
#[cfg(feature = "sync")]
389-
/// Push WAL frames to remote.
390-
pub async fn push(&self) -> Result<crate::database::Replicated> {
389+
/// Sync WAL frames to remote.
390+
pub async fn sync_offline(&self) -> Result<crate::database::Replicated> {
391391
use crate::sync::SyncError;
392392
use crate::Error;
393393

@@ -425,6 +425,10 @@ impl Database {
425425

426426
let max_frame_no = conn.wal_frame_count();
427427

428+
if max_frame_no == 0 {
429+
return self.try_pull(&mut sync_ctx).await;
430+
}
431+
428432
let generation = sync_ctx.generation(); // TODO: Probe from WAL.
429433
let start_frame_no = sync_ctx.durable_frame_num() + 1;
430434
let end_frame_no = max_frame_no;
@@ -448,6 +452,10 @@ impl Database {
448452

449453
sync_ctx.write_metadata().await?;
450454

455+
if start_frame_no > end_frame_no {
456+
return self.try_pull(&mut sync_ctx).await;
457+
}
458+
451459
// TODO(lucio): this can underflow if the server previously returned a higher max_frame_no
452460
// than what we have stored here.
453461
let frame_count = end_frame_no - start_frame_no + 1;
@@ -457,6 +465,33 @@ impl Database {
457465
})
458466
}
459467

468+
#[cfg(feature = "sync")]
469+
async fn try_pull(&self, sync_ctx: &mut SyncContext) -> Result<crate::database::Replicated> {
470+
let generation = sync_ctx.generation();
471+
let mut frame_no = sync_ctx.durable_frame_num() + 1;
472+
let conn = self.connect()?;
473+
conn.wal_insert_begin()?;
474+
loop {
475+
match sync_ctx.pull_one_frame(generation, frame_no).await {
476+
Ok(frame) => {
477+
conn.wal_insert_frame(&frame)?;
478+
frame_no += 1;
479+
}
480+
Err(e) => {
481+
println!("pull_one_frame error: {:?}", e);
482+
break;
483+
}
484+
}
485+
486+
}
487+
conn.wal_insert_end()?;
488+
sync_ctx.write_metadata().await?;
489+
Ok(crate::database::Replicated {
490+
frame_no: None,
491+
frames_synced: 1,
492+
})
493+
}
494+
460495
pub(crate) fn path(&self) -> &str {
461496
&self.db_path
462497
}

libsql/src/sync.rs

+64
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ pub enum SyncError {
4747
InvalidPushFrameNoLow(u32, u32),
4848
#[error("server returned a higher frame_no: sent={0}, got={1}")]
4949
InvalidPushFrameNoHigh(u32, u32),
50+
#[error("failed to pull frame: status={0}, error={1}")]
51+
PullFrame(StatusCode, String),
5052
}
5153

5254
impl SyncError {
@@ -104,6 +106,21 @@ impl SyncContext {
104106
Ok(me)
105107
}
106108

109+
#[tracing::instrument(skip(self))]
110+
pub(crate) async fn pull_one_frame(&mut self, generation: u32, frame_no: u32) -> Result<Bytes> {
111+
let uri = format!(
112+
"{}/sync/{}/{}/{}",
113+
self.sync_url,
114+
generation,
115+
frame_no,
116+
frame_no + 1
117+
);
118+
tracing::debug!("pulling frame");
119+
let frame = self.pull_with_retry(uri, self.max_retries).await?;
120+
self.durable_frame_num = frame_no;
121+
Ok(frame)
122+
}
123+
107124
#[tracing::instrument(skip(self, frame))]
108125
pub(crate) async fn push_one_frame(
109126
&mut self,
@@ -215,6 +232,53 @@ impl SyncContext {
215232
}
216233
}
217234

235+
async fn pull_with_retry(&self, uri: String, max_retries: usize) -> Result<Bytes> {
236+
let mut nr_retries = 0;
237+
loop {
238+
let mut req = http::Request::builder().method("GET").uri(uri.clone());
239+
240+
match &self.auth_token {
241+
Some(auth_token) => {
242+
req = req.header("Authorization", auth_token);
243+
}
244+
None => {}
245+
}
246+
247+
let req = req.body(Body::empty())
248+
.expect("valid request");
249+
250+
let res = self
251+
.client
252+
.request(req)
253+
.await
254+
.map_err(SyncError::HttpDispatch)?;
255+
256+
if res.status().is_success() {
257+
let frame = hyper::body::to_bytes(res.into_body())
258+
.await
259+
.map_err(SyncError::HttpBody)?;
260+
return Ok(frame);
261+
}
262+
// If we've retried too many times or the error is not a server error,
263+
// return the error.
264+
if nr_retries > max_retries || !res.status().is_server_error() {
265+
let status = res.status();
266+
267+
let res_body = hyper::body::to_bytes(res.into_body())
268+
.await
269+
.map_err(SyncError::HttpBody)?;
270+
271+
let msg = String::from_utf8_lossy(&res_body[..]);
272+
273+
return Err(SyncError::PullFrame(status, msg.to_string()).into());
274+
}
275+
276+
let delay = std::time::Duration::from_millis(100 * (1 << nr_retries));
277+
tokio::time::sleep(delay).await;
278+
nr_retries += 1;
279+
}
280+
}
281+
218282
pub(crate) fn durable_frame_num(&self) -> u32 {
219283
self.durable_frame_num
220284
}

0 commit comments

Comments
 (0)