Skip to content

Commit 7c06a4f

Browse files
[FIXED] workqueue reset to 0 when blk file is zero-sized due to unflushed data on crash (#6882)
I encountered an issue where a Workqueue stream’s first and last sequence numbers were unexpectedly reset to 0 following an abrupt termination of the NATS server. Interestingly, the consumer remained fully caught up with messages and retained its expected state even after the crash, but the stream itself appeared to have been reset. I was able to retrieve a backup of the data after the crash and debug it locally. During analysis, I found that new msgs had not been flushed to disk, resulting in a zero-sized blk file I believe. As a result, during recovery, the stream state remained at zero and the index.db could not be used to reconstruct the state Resolves : #6881 Signed-off-by: souravagrawal <souravagrawal1111@gmail.com>
2 parents d50a093 + 485fb9c commit 7c06a4f

File tree

2 files changed

+37
-5
lines changed

2 files changed

+37
-5
lines changed

server/filestore.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -475,18 +475,20 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
475475
}
476476

477477
// Check if our prior state remembers a last sequence past where we can see.
478-
if fs.ld != nil && prior.LastSeq > fs.state.LastSeq {
478+
if prior.LastSeq > fs.state.LastSeq {
479479
fs.state.LastSeq, fs.state.LastTime = prior.LastSeq, prior.LastTime
480480
if fs.state.Msgs == 0 {
481481
fs.state.FirstSeq = fs.state.LastSeq + 1
482482
fs.state.FirstTime = time.Time{}
483483
}
484-
if _, err := fs.newMsgBlockForWrite(); err == nil {
485-
if err = fs.writeTombstone(prior.LastSeq, prior.LastTime.UnixNano()); err != nil {
484+
if fs.ld != nil {
485+
if _, err := fs.newMsgBlockForWrite(); err == nil {
486+
if err = fs.writeTombstone(prior.LastSeq, prior.LastTime.UnixNano()); err != nil {
487+
return nil, err
488+
}
489+
} else {
486490
return nil, err
487491
}
488-
} else {
489-
return nil, err
490492
}
491493
}
492494
// Since we recovered here, make sure to kick ourselves to write out our stream state.

server/filestore_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9555,3 +9555,33 @@ func TestFileStoreAllLastSeqs(t *testing.T) {
95559555
require_NoError(t, err)
95569556
require_True(t, reflect.DeepEqual(seqs, expected))
95579557
}
9558+
9559+
func TestFileStoreRecoverDoesNotResetStreamState(t *testing.T) {
9560+
cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 5 * time.Second, Retention: WorkQueuePolicy}
9561+
fs, err := newFileStore(
9562+
FileStoreConfig{StoreDir: t.TempDir()},
9563+
cfg)
9564+
9565+
require_NoError(t, err)
9566+
defer fs.Stop()
9567+
9568+
subj, msg := "foo", []byte("Hello World")
9569+
toStore := 500
9570+
for i := 0; i < toStore; i++ {
9571+
_, _, err := fs.StoreMsg(subj, nil, msg, 0)
9572+
require_NoError(t, err)
9573+
}
9574+
time.Sleep(5 * time.Second)
9575+
fs, err = newFileStoreWithCreated(fs.fcfg, cfg, time.Now(), prf(&fs.fcfg), nil) //Expire all messages so stream does not hold any message, this is to simulate consumer consuming all messages.
9576+
require_NoError(t, err)
9577+
require_NoError(t, fs.Stop()) //To Ensure there is a state file created
9578+
require_True(t, len(fs.blks) == 1) //Since all messages are expire there should be only 1 blk file exist
9579+
os.Remove(fs.blks[0].mfn) // we can change it to have a consumer and consumer all messages too, but removing blk files will simulate same behavior
9580+
9581+
//Now at this point stream has only index.db file and no blk files as all are deleted. previously it used to reset the stream state to 0
9582+
// now it will use index.db to populate stream state if could not be recovered from blk files.
9583+
fs, err = newFileStoreWithCreated(fs.fcfg, cfg, time.Now(), prf(&fs.fcfg), nil)
9584+
require_NoError(t, err)
9585+
require_True(t, fs.state.FirstSeq|fs.state.LastSeq != 0)
9586+
9587+
}

0 commit comments

Comments
 (0)