diff --git a/bottomless/src/bottomless_wal.rs b/bottomless/src/bottomless_wal.rs index 98fd59ce16..9cbee2732a 100644 --- a/bottomless/src/bottomless_wal.rs +++ b/bottomless/src/bottomless_wal.rs @@ -74,6 +74,7 @@ impl WrapWal for BottomlessWalWrapper { let num_frames = wrapped.insert_frames(page_size, page_headers, size_after, is_commit, sync_flags)?; + let new_valid_valid_frame_index = wrapped.frames_in_wal(); let mut guard = self.replicator.blocking_lock(); match &mut *guard { @@ -83,11 +84,11 @@ impl WrapWal for BottomlessWalWrapper { std::process::abort() } replicator.register_last_valid_frame(last_valid_frame); - let new_valid_valid_frame_index = wrapped.frames_in_wal(); replicator.submit_frames(new_valid_valid_frame_index - last_valid_frame); } None => return Err(Error::new(SQLITE_IOERR_WRITE)), } + tracing::debug!("inserted {num_frames} with size_after={size_after}, is_commit={is_commit} ({last_valid_frame} -> {new_valid_valid_frame_index})"); Ok(num_frames) } diff --git a/bottomless/src/lib.rs b/bottomless/src/lib.rs index 1a67e92f84..ecb5576b43 100644 --- a/bottomless/src/lib.rs +++ b/bottomless/src/lib.rs @@ -8,6 +8,7 @@ mod completion_progress; pub mod read; pub mod replicator; pub mod transaction_cache; +mod utils; pub mod uuid_utils; mod wal; diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index cd37a70165..61323c9fd4 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -1,6 +1,7 @@ use crate::backup::WalCopier; use crate::completion_progress::{CompletionProgress, SavepointTracker}; use crate::read::BatchReader; +use crate::utils; use crate::uuid_utils::decode_unix_timestamp; use crate::wal::WalFileReader; use anyhow::{anyhow, bail}; @@ -61,6 +62,7 @@ pub struct Replicator { pub page_size: usize, generation: Arc>, verify_crc: bool, + validate_integrity: bool, pub bucket: String, pub db_path: String, pub db_name: String, @@ -93,6 +95,8 @@ pub struct Options { /// If `true` when restoring, frames checksums will be verified prior their pages being flushed /// into the main database file. pub verify_crc: bool, + /// If `true` when restoring, db integrity will be verified before finishing the process + pub validate_integrity: bool, /// Kind of compression algorithm used on the WAL frames to be sent to S3. pub use_compression: CompressionKind, pub encryption_config: Option, @@ -216,6 +220,17 @@ impl Options { other ), }; + let validate_integrity = match env_var_or("LIBSQL_BOTTOMLESS_VALIDATE_INTEGRITY", true) + .to_lowercase() + .as_ref() + { + "yes" | "true" | "1" | "y" | "t" => true, + "no" | "false" | "0" | "n" | "f" => false, + other => bail!( + "Invalid LIBSQL_BOTTOMLESS_VALIDATE_INTEGRITY environment variable: {}", + other + ), + }; let skip_snapshot = match env_var_or("LIBSQL_BOTTOMLESS_SKIP_SNAPSHOT", false) .to_lowercase() .as_ref() @@ -240,6 +255,7 @@ impl Options { db_id, create_bucket_if_not_exists: true, verify_crc, + validate_integrity, use_compression, encryption_config, max_batch_interval, @@ -377,7 +393,11 @@ impl Replicator { .send() .await { - tracing::error!("Failed to send {} to S3: {}", fpath, e); + utils::caution!( + "Failed to send {} to S3: {} (this will lead to gaps in frame ranges)", + fpath, + e, + ); } else { tokio::fs::remove_file(&fpath).await.unwrap(); let elapsed = Instant::now() - start; @@ -405,6 +425,7 @@ impl Replicator { flush_trigger: Some(flush_trigger), last_committed_frame_no, verify_crc: options.verify_crc, + validate_integrity: options.validate_integrity, db_path, db_name, snapshot_waiter, @@ -699,11 +720,11 @@ impl Replicator { ))); tokio::spawn(async move { if let Err(e) = request.send().await { - tracing::error!( - "Failed to store dependency between generations {} -> {}: {}", + utils::caution!( + "Failed to store dependency between generations {} -> {}: {} (this will lead to broken dependency chain)", prev, curr, - e + e, ); } else { tracing::trace!( @@ -1222,7 +1243,7 @@ impl Replicator { let elapsed = Instant::now() - start_ts; tracing::info!("Finished database restoration in {:?}", elapsed); tokio::fs::rename(&restore_path, &self.db_path).await?; - let _ = self.remove_wal_files().await; // best effort, WAL files may not exists + let _ = self.remove_wal_files(&self.db_name).await; // best effort, WAL files may not exists Ok(result) } Err(e) => { @@ -1450,6 +1471,7 @@ impl Replicator { utc_time: Option, db_path: &Path, ) -> Result { + tracing::debug!("restore wal from generation {generation}"); let encryption_config = self.encryption_config.clone(); let mut injector = libsql_replication::injector::SqliteInjector::new( db_path.to_path_buf(), @@ -1465,8 +1487,11 @@ impl Replicator { unsafe { v.set_len(page_size) }; v }; + let mut next_marker = None; let mut applied_wal_frame = false; + let mut last_injected_frame_no = 0; + let mut last_seen_frame_no = 0; 'restore_wal: loop { let mut list_request = self.list_objects().prefix(&prefix); if let Some(marker) = next_marker { @@ -1481,34 +1506,67 @@ impl Replicator { break; } - let mut last_received_frame_no = 0; for obj in objs { let key = obj .key() .ok_or_else(|| anyhow::anyhow!("Failed to get key for an object"))?; - tracing::debug!("Loading {}", key); - let (first_frame_no, last_frame_no, timestamp, compression_kind) = match Self::parse_frame_range(key) { Some(result) => result, None => { - if !key.ends_with(".gz") - && !key.ends_with(".zstd") - && !key.ends_with(".db") - && !key.ends_with(".meta") + // if this looks like a frame range from WAL - we must be able to parse + // range from it + if !key.ends_with(".meta") && !key.ends_with(".dep") && !key.ends_with(".changecounter") + && !key.ends_with("db.gz") + && !key.ends_with("db.zstd") + && !key.ends_with("db.raw") { - tracing::warn!("Failed to parse frame/page from key {}", key); + utils::caution!( + "Failed to parse frame/page: db_name={}, generation={}, key={}", + &self.db_name, + &generation, + key + ); } continue; } }; - if first_frame_no != last_received_frame_no + 1 { - tracing::warn!("Missing series of consecutive frames. Last applied frame: {}, next found: {}. Stopping the restoration process", - last_received_frame_no, first_frame_no); + tracing::debug!( + "loading object: key={}, first_frame_no={}, last_frame_no={}, timestamp={}, compression={}", + key, + first_frame_no, + last_frame_no, + timestamp, + compression_kind, + ); + if first_frame_no != last_injected_frame_no + 1 { + utils::caution!( + "Missing series of consecutive frames. Last applied frame: {}, next found: {}. Stopping the restoration process: db_name={}, generation={} (this can lead to inconsistent restore)", + last_injected_frame_no, + first_frame_no, + &self.db_name, + &generation + ); break; + } else if first_frame_no != last_seen_frame_no + 1 { + // there can be the case that bottomless has several overlapping frame ranges + // but one of them is empty - so we will ignore it (as it has no frames) + // for example: + // 9 bytes (empty zstd frame) .../000000000001-000000000001-1724236016.zstd + // 878 bytes .../000000000001-000000000001-1724236022.zstd + // but it's good to detect such cases + utils::caution!( + "detected series of non-consecutive frames: last_seen_frame_no={}, first_frame_no={}: db_name={}, generation={} (this can lead to inconsistent restore)", + last_seen_frame_no, + first_frame_no, + &self.db_name, + &generation + ); } + last_seen_frame_no = last_frame_no; + if let Some(frame) = last_consistent_frame { if last_frame_no > frame { tracing::warn!("Remote log contains frame {} larger than last consistent frame ({}), stopping the restoration process", @@ -1525,7 +1583,12 @@ impl Replicator { } } _ => { - tracing::trace!("Couldn't parse requested frame batch {} timestamp. Stopping recovery.", key); + utils::caution!( + "Couldn't parse requested frame batch {} timestamp. Stopping recovery: db_name={}, generation={}", + &self.db_name, + &generation, + key + ); break 'restore_wal; } } @@ -1539,7 +1602,7 @@ impl Replicator { ); while let Some(frame) = reader.next_frame_header().await? { - last_received_frame_no = reader.next_frame_no(); + last_injected_frame_no = reader.next_frame_no(); reader.next_page(&mut page_buf).await?; if self.verify_crc { checksum = frame.verify(checksum, &page_buf)?; @@ -1548,13 +1611,18 @@ impl Replicator { let checksum = (crc1 as u64) << 32 | crc2 as u64; let frame_to_inject = libsql_replication::frame::Frame::from_parts( &libsql_replication::frame::FrameHeader { - frame_no: (last_received_frame_no as u64).into(), + frame_no: (last_injected_frame_no as u64).into(), checksum: checksum.into(), page_no: frame.pgno().into(), size_after: frame.size_after().into(), }, page_buf.as_slice(), ); + tracing::debug!( + "ready to inject frame: pgno={}, sizeafter={}", + frame.pgno(), + frame.size_after() + ); let frame = RpcFrame { data: frame_to_inject.bytes(), timestamp: None, @@ -1577,13 +1645,48 @@ impl Replicator { break; } } + if self.validate_integrity { + if let Err(e) = injector.validate_integrity() { + utils::caution!( + "found integrity issues: {}, db_name={}, generation={}", + e, + &self.db_name, + &generation + ); + return Err(anyhow!("DB is not correct")); + } + } + // drop of injector will cause drop&close of last DB connection which will perform final + // WAL checkpoint of the DB + drop(injector); + + let db_path_str = db_path + .to_str() + .ok_or(anyhow!("failed to convert db path to string"))?; + let db_wal_file_path = format!("{}-wal", &db_path_str); + let db_wal_index_path = format!("{}-shm", &db_path_str); + let has_wal_file = tokio::fs::try_exists(&db_wal_file_path).await?; + let has_wal_index = tokio::fs::try_exists(&db_wal_index_path).await?; + if has_wal_file || has_wal_index { + // restore process was not finished successfully as WAL wasn't transferred completely + tracing::error!( + "WAL wasn't transferred completely during restoration: db_name={}, generation={}", + &self.db_name, + &generation + ); + let _ = self + .remove_wal_files(&db_path_str) + .await + .inspect_err(|e| tracing::error!("unable to remove wal files: {}", e)); + return Err(anyhow!("WAL wasn't transferred completely")); + } Ok(applied_wal_frame) } - async fn remove_wal_files(&self) -> Result<()> { - tracing::debug!("Overwriting any existing WAL file: {}-wal", &self.db_path); - tokio::fs::remove_file(&format!("{}-wal", &self.db_path)).await?; - tokio::fs::remove_file(&format!("{}-shm", &self.db_path)).await?; + async fn remove_wal_files(&self, db_path: &str) -> Result<()> { + tracing::debug!("Remove any existing WAL file: {}-wal", &db_path); + tokio::fs::remove_file(&format!("{}-wal", &db_path)).await?; + tokio::fs::remove_file(&format!("{}-shm", &db_path)).await?; Ok(()) } @@ -1672,7 +1775,11 @@ impl Replicator { .send() .await { - tracing::error!("Failed to send {} to S3: {}", key, e); + utils::caution!( + "Failed to send {} to S3: {} (this will lead to gaps in the frame ranges)", + key, + e, + ); } else { tokio::fs::remove_file(&fpath).await.unwrap(); tracing::trace!("Uploaded to S3: {}", key); diff --git a/bottomless/src/utils.rs b/bottomless/src/utils.rs new file mode 100644 index 0000000000..4622015f41 --- /dev/null +++ b/bottomless/src/utils.rs @@ -0,0 +1,10 @@ +macro_rules! caution { + ($msg:expr) => { + tracing::error!(concat!("BOTTOMLESS CAUTION: ", $msg)); + }; + ($msg:expr, $($arg:tt)*) => { + tracing::error!(concat!("BOTTOMLESS CAUTION: ", $msg), $($arg)*); + }; +} + +pub(crate) use caution; diff --git a/libsql-replication/src/injector/sqlite_injector/mod.rs b/libsql-replication/src/injector/sqlite_injector/mod.rs index f6ce2aa89f..bf63be19d0 100644 --- a/libsql-replication/src/injector/sqlite_injector/mod.rs +++ b/libsql-replication/src/injector/sqlite_injector/mod.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use std::{collections::VecDeque, path::PathBuf}; use parking_lot::Mutex; +use rusqlite::ffi::SQLITE_ERROR; use rusqlite::OpenFlags; use tokio::task::spawn_blocking; @@ -65,6 +66,10 @@ impl SqliteInjector { inner: Arc::new(Mutex::new(inner)), }) } + + pub fn validate_integrity(&mut self) -> Result<()> { + self.inner.lock().validate_integrity() + } } pub(in super::super) struct SqliteInjectorInner { @@ -124,6 +129,23 @@ impl SqliteInjectorInner { }) } + pub fn validate_integrity(&mut self) -> Result<()> { + self.connection + .lock() + .pragma_query(None, "integrity_check", |row| { + let row: String = row.get(0)?; + if row != "ok" { + Err(rusqlite::Error::SqliteFailure( + rusqlite::ffi::Error::new(SQLITE_ERROR), + Some(format!("found integrity issues: {row}")), + )) + } else { + Ok(()) + } + })?; + Ok(()) + } + /// Inject a frame into the log. If this was a commit frame, returns Ok(Some(FrameNo)). pub fn inject_frame(&mut self, frame: Frame) -> Result, Error> { let frame_close_txn = frame.header().size_after.get() != 0; diff --git a/libsql-server/src/namespace/meta_store.rs b/libsql-server/src/namespace/meta_store.rs index 1fb75db63c..a78deef252 100644 --- a/libsql-server/src/namespace/meta_store.rs +++ b/libsql-server/src/namespace/meta_store.rs @@ -114,6 +114,7 @@ pub async fn metastore_connection_maker( let options = bottomless::replicator::Options { create_bucket_if_not_exists: true, verify_crc: true, + validate_integrity: false, use_compression: CompressionKind::None, encryption_config: None, aws_endpoint: Some(config.bucket_endpoint), diff --git a/libsql-server/src/test/bottomless.rs b/libsql-server/src/test/bottomless.rs index 5f3015e11e..d9eb9faa12 100644 --- a/libsql-server/src/test/bottomless.rs +++ b/libsql-server/src/test/bottomless.rs @@ -10,6 +10,7 @@ use s3s::service::S3ServiceBuilder; use std::net::{SocketAddr, ToSocketAddrs}; use std::path::PathBuf; use std::sync::Once; +use tokio::task::JoinError; use tokio::time::sleep; use tokio::time::Duration; use url::Url; @@ -61,7 +62,7 @@ async fn start_s3_server() { } /// returns a future that once polled will shutdown the server and wait for cleanup -fn start_db(step: u32, server: Server) -> impl Future { +fn start_db(step: u32, server: Server) -> impl Future> { let notify = server.shutdown.clone(); let handle = tokio::spawn(async move { if let Err(e) = server.start().await { @@ -71,7 +72,7 @@ fn start_db(step: u32, server: Server) -> impl Future { async move { notify.notify_waiters(); - handle.await.unwrap(); + handle.await } } @@ -176,7 +177,7 @@ async fn backup_restore() { sleep(Duration::from_secs(2)).await; - db_job.await; + db_job.await.unwrap(); drop(cleaner); } @@ -195,7 +196,7 @@ async fn backup_restore() { assert_updates(&connection_addr, ROWS, OPS, "A").await; - db_job.await; + db_job.await.unwrap(); drop(cleaner); } @@ -213,7 +214,7 @@ async fn backup_restore() { // wait for WAL to backup sleep(Duration::from_secs(2)).await; - db_job.await; + db_job.await.unwrap(); drop(cleaner); } @@ -228,7 +229,7 @@ async fn backup_restore() { assert_updates(&connection_addr, ROWS, OPS, "B").await; - db_job.await; + db_job.await.unwrap(); drop(cleaner); } @@ -247,7 +248,260 @@ async fn backup_restore() { assert_updates(&connection_addr, ROWS, OPS, "B").await; - db_job.await; + db_job.await.unwrap(); + drop(cleaner); + } +} + +async fn list_bucket(bucket: &str) -> Vec { + let client = s3_client().await.expect("failed to create s3 client"); + let objects = client + .list_objects() + .bucket(bucket) + .prefix("") + .send() + .await + .expect("failed to list objects"); + objects + .contents() + .iter() + .map(|x| String::from(x.key().unwrap())) + .collect() +} + +#[tokio::test] +async fn restore_from_partial_db() { + let _ = tracing_subscriber::fmt::try_init(); + + start_s3_server().await; + + const DB_ID: &str = "partialbackup"; + const BUCKET: &str = "partialbackup"; + const PATH: &str = "partialbackup.sqld"; + const PORT: u16 = 15003; + + let _ = S3BucketCleaner::new(BUCKET).await; + assert_bucket_occupancy(BUCKET, true).await; + + let listener_addr = format!("0.0.0.0:{}", PORT) + .to_socket_addrs() + .unwrap() + .next() + .unwrap(); + let conn = Url::parse(&format!("http://localhost:{}", PORT)).unwrap(); + let options = bottomless::replicator::Options { + db_id: Some(DB_ID.to_string()), + create_bucket_if_not_exists: true, + verify_crc: true, + use_compression: bottomless::replicator::CompressionKind::None, + bucket_name: BUCKET.to_string(), + max_batch_interval: Duration::from_millis(10), + ..bottomless::replicator::Options::from_env().unwrap() + }; + let make_server = || async { configure_server(&options, listener_addr, PATH).await }; + { + tracing::info!( + "---STEP 1: create db, write rows, remove random S3 files to create effect of partial backup---" + ); + let cleaner = DbFileCleaner::new(PATH); + let db_job = start_db(1, make_server().await); + + sleep(Duration::from_secs(2)).await; + + let _ = sql( + &conn, + ["CREATE TABLE IF NOT EXISTS t(id INT PRIMARY KEY, name TEXT, payload BLOB);"], + ) + .await + .unwrap(); + for i in 0..128 { + sql( + &conn, + [format!("INSERT INTO t VALUES({i}, '{i}', zeroblob(4096))")], + ) + .await + .expect("SQL query failed"); + } + + tracing::info!("Ready to remove files from S3"); + let client = s3_client().await.expect("failed to create s3 client"); + let mut i = 0; + for key in list_bucket(BUCKET).await { + // delete full snapshot and random wal frame ranges + let should_delete = key.ends_with(".changecounter") + || key.ends_with(".dep") + || key.ends_with("db.raw") + || i % 10 == 0; + + if should_delete { + client + .delete_object() + .bucket(BUCKET) + .key(key) + .send() + .await + .expect("failed to delete object"); + } + i += 1; + } + + db_job.await.unwrap(); + drop(cleaner); + } + + { + sleep(Duration::from_secs(2)).await; + + tracing::info!("---STEP 2: recreate database, check that it is unable to start ---"); + let cleaner = DbFileCleaner::new(PATH); + let db_job = start_db(2, make_server().await); + sleep(Duration::from_secs(2)).await; + + let result = sql(&conn, ["SELECT COUNT(*) as cnt FROM t"]).await.unwrap(); + let count = result + .first() + .unwrap() + .clone() + .into_result_set() + .unwrap() + .rows[0] + .cells["cnt"] + .clone(); + if let Value::Integer(x) = count { + assert!(0 < x && x < 128); + } else { + assert!(false); + } + db_job.await.unwrap(); + drop(cleaner); + } +} + +#[tokio::test] +async fn do_not_restore_from_corrupted_db() { + let _ = tracing_subscriber::fmt::try_init(); + + start_s3_server().await; + + const DB_ID: &str = "corruptedbackup"; + const BUCKET: &str = "corruptedbackup"; + const PATH: &str = "corruptedbackup.sqld"; + const PORT: u16 = 15004; + + let _ = S3BucketCleaner::new(BUCKET).await; + assert_bucket_occupancy(BUCKET, true).await; + + let listener_addr = format!("0.0.0.0:{}", PORT) + .to_socket_addrs() + .unwrap() + .next() + .unwrap(); + let conn = Url::parse(&format!("http://localhost:{}", PORT)).unwrap(); + let options = bottomless::replicator::Options { + db_id: Some(DB_ID.to_string()), + create_bucket_if_not_exists: true, + verify_crc: false, + validate_integrity: true, + use_compression: bottomless::replicator::CompressionKind::None, + bucket_name: BUCKET.to_string(), + max_batch_interval: Duration::from_millis(10), + ..bottomless::replicator::Options::from_env().unwrap() + }; + let make_server = || async { configure_server(&options, listener_addr, PATH).await }; + { + tracing::info!("---STEP 1: create db, write rows, corrupt random S3 files ---"); + let cleaner = DbFileCleaner::new(PATH); + let db_job = start_db(1, make_server().await); + + sleep(Duration::from_secs(2)).await; + + let _ = sql( + &conn, + ["CREATE TABLE IF NOT EXISTS t(id INT PRIMARY KEY, name TEXT, payload BLOB);"], + ) + .await + .unwrap(); + for i in 0..128 { + sql( + &conn, + [format!("INSERT INTO t VALUES({i}, '{i}', zeroblob(128))")], + ) + .await + .expect("SQL query failed"); + } + + tracing::info!("Ready to remove files from S3"); + let client = s3_client().await.expect("failed to create s3 client"); + for key in list_bucket(BUCKET).await { + // delete full snapshot files + let should_delete = + key.ends_with(".changecounter") || key.ends_with(".dep") || key.ends_with("db.raw"); + if should_delete { + client + .delete_object() + .bucket(BUCKET) + .key(&key) + .send() + .await + .expect("failed to delete object"); + } else if key.ends_with(".raw") { + tracing::info!("corrupt frame range: {key}"); + // corrupt random wal frame range + let response = client + .get_object() + .bucket(BUCKET) + .key(&key) + .send() + .await + .expect("failed to read object"); + let mut bytes: Vec = response + .body + .collect() + .await + .expect("failed to read body") + .into_bytes() + .into(); + let single_frame_size = 24 + 4096; + assert!(bytes.len() % single_frame_size == 0); + for frame in 0..bytes.len() / single_frame_size { + let page_number = u32::from_be_bytes( + bytes[frame * single_frame_size..frame * single_frame_size + 4] + .try_into() + .unwrap(), + ); + if page_number <= 1 { + continue; + } + tracing::info!("corrupting page {page_number}"); + for b in frame * single_frame_size + 24..(frame + 1) * single_frame_size { + bytes[b] = 255; + } + } + client + .put_object() + .bucket(BUCKET) + .key(&key) + .body(bytes.into()) + .send() + .await + .expect("failed to put body"); + } + } + + db_job.await.unwrap(); + drop(cleaner); + } + + { + sleep(Duration::from_secs(2)).await; + + tracing::info!("---STEP 2: recreate database, check that it is unable to start ---"); + let cleaner = DbFileCleaner::new(PATH); + let db_job = start_db(2, make_server().await); + sleep(Duration::from_secs(2)).await; + + assert!(sql(&conn, ["SELECT COUNT(*) as cnt FROM t"]).await.is_err()); + assert!(db_job.await.is_err()); drop(cleaner); } } @@ -340,7 +594,7 @@ async fn rollback_restore() { "rollback value should not be updated" ); - db_job.await; + db_job.await.unwrap(); drop(cleaner); } @@ -369,7 +623,7 @@ async fn rollback_restore() { ] ); - db_job.await; + db_job.await.unwrap(); drop(cleaner); } }