Skip to content

Commit 485fb9c

Browse files
committed
[FIXED] workqueue reset to 0 when blk file is zero-sized due to unflushed data on crash
Signed-off-by: souravagrawal <souravagrawal1111@gmail.com>
1 parent 1da9471 commit 485fb9c

File tree

2 files changed

+37
-5
lines changed

2 files changed

+37
-5
lines changed

server/filestore.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -475,18 +475,20 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
475475
}
476476

477477
// Check if our prior state remembers a last sequence past where we can see.
478-
if fs.ld != nil && prior.LastSeq > fs.state.LastSeq {
478+
if prior.LastSeq > fs.state.LastSeq {
479479
fs.state.LastSeq, fs.state.LastTime = prior.LastSeq, prior.LastTime
480480
if fs.state.Msgs == 0 {
481481
fs.state.FirstSeq = fs.state.LastSeq + 1
482482
fs.state.FirstTime = time.Time{}
483483
}
484-
if _, err := fs.newMsgBlockForWrite(); err == nil {
485-
if err = fs.writeTombstone(prior.LastSeq, prior.LastTime.UnixNano()); err != nil {
484+
if fs.ld != nil {
485+
if _, err := fs.newMsgBlockForWrite(); err == nil {
486+
if err = fs.writeTombstone(prior.LastSeq, prior.LastTime.UnixNano()); err != nil {
487+
return nil, err
488+
}
489+
} else {
486490
return nil, err
487491
}
488-
} else {
489-
return nil, err
490492
}
491493
}
492494
// Since we recovered here, make sure to kick ourselves to write out our stream state.

server/filestore_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9555,3 +9555,33 @@ func TestFileStoreAllLastSeqs(t *testing.T) {
95559555
require_NoError(t, err)
95569556
require_True(t, reflect.DeepEqual(seqs, expected))
95579557
}
9558+
9559+
func TestFileStoreRecoverDoesNotResetStreamState(t *testing.T) {
9560+
cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 5 * time.Second, Retention: WorkQueuePolicy}
9561+
fs, err := newFileStore(
9562+
FileStoreConfig{StoreDir: t.TempDir()},
9563+
cfg)
9564+
9565+
require_NoError(t, err)
9566+
defer fs.Stop()
9567+
9568+
subj, msg := "foo", []byte("Hello World")
9569+
toStore := 500
9570+
for i := 0; i < toStore; i++ {
9571+
_, _, err := fs.StoreMsg(subj, nil, msg, 0)
9572+
require_NoError(t, err)
9573+
}
9574+
time.Sleep(5 * time.Second)
9575+
fs, err = newFileStoreWithCreated(fs.fcfg, cfg, time.Now(), prf(&fs.fcfg), nil) //Expire all messages so stream does not hold any message, this is to simulate consumer consuming all messages.
9576+
require_NoError(t, err)
9577+
require_NoError(t, fs.Stop()) //To Ensure there is a state file created
9578+
require_True(t, len(fs.blks) == 1) //Since all messages are expire there should be only 1 blk file exist
9579+
os.Remove(fs.blks[0].mfn) // we can change it to have a consumer and consumer all messages too, but removing blk files will simulate same behavior
9580+
9581+
//Now at this point stream has only index.db file and no blk files as all are deleted. previously it used to reset the stream state to 0
9582+
// now it will use index.db to populate stream state if could not be recovered from blk files.
9583+
fs, err = newFileStoreWithCreated(fs.fcfg, cfg, time.Now(), prf(&fs.fcfg), nil)
9584+
require_NoError(t, err)
9585+
require_True(t, fs.state.FirstSeq|fs.state.LastSeq != 0)
9586+
9587+
}

0 commit comments

Comments
 (0)