@@ -47,6 +47,12 @@ pub struct LogStoreWriteCommand {
4747 ack : Option < Ack > ,
4848}
4949
50+ impl LogStoreWriteCommand {
51+ pub fn requires_sync_write ( & self ) -> bool {
52+ matches ! ( & self . metadata_update, Some ( MetadataUpdate :: Seal ) )
53+ }
54+ }
55+
5056enum DataUpdate {
5157 StoreBatch { store_message : Store } ,
5258 TrimLogRecords { trim_point : LogletOffset } ,
@@ -63,6 +69,8 @@ pub(crate) struct LogStoreWriter {
6369 batch_acks_buf : Vec < Ack > ,
6470 buffer : BytesMut ,
6571 health_status : HealthStatus < LogServerStatus > ,
72+ sync_write_is_required : bool ,
73+ // add WriteBatch
6674}
6775
6876impl LogStoreWriter {
@@ -72,6 +80,7 @@ impl LogStoreWriter {
7280 batch_acks_buf : Vec :: default ( ) ,
7381 buffer : BytesMut :: with_capacity ( INITIAL_SERDE_BUFFER_SIZE ) ,
7482 health_status,
83+ sync_write_is_required : false ,
7584 }
7685 }
7786
@@ -128,6 +137,8 @@ impl LogStoreWriter {
128137 . expect ( "metadata cf exists" ) ;
129138
130139 for command in commands {
140+ self . sync_write_is_required |= command. requires_sync_write ( ) ;
141+
131142 match command. data_update {
132143 Some ( DataUpdate :: StoreBatch { store_message } ) => Self :: process_store_message (
133144 store_message,
@@ -228,8 +239,16 @@ impl LogStoreWriter {
228239
229240 async fn commit ( & mut self , opts : & LogServerOptions , write_batch : WriteBatch ) {
230241 let mut write_opts = rocksdb:: WriteOptions :: new ( ) ;
231- write_opts. disable_wal ( opts. rocksdb . rocksdb_disable_wal ( ) ) ;
232- write_opts. set_sync ( !opts. rocksdb_disable_wal_fsync ( ) ) ;
242+ if self . sync_write_is_required {
243+ write_opts. disable_wal ( false ) ;
244+ write_opts. set_sync ( true ) ;
245+ } else {
246+ write_opts. disable_wal ( opts. rocksdb . rocksdb_disable_wal ( ) ) ;
247+ write_opts. set_sync ( !opts. rocksdb_disable_wal_fsync ( ) ) ;
248+ }
249+
250+ // Reset the flag until we see a command that requires a sync write again.
251+ self . sync_write_is_required = false ;
233252 // hint to rocksdb to insert the memtable position hint for the batch, our writes per batch
234253 // are mostly ordered.
235254 write_opts. set_memtable_insert_hint_per_batch ( true ) ;
0 commit comments