@@ -275,6 +275,8 @@ const (
275
275
wiThresh = int64 (30 * time .Second )
276
276
// Time threshold to write index info for non FIFO cases
277
277
winfThresh = int64 (2 * time .Second )
278
+ // Checksum size for hash for msg records.
279
+ recordHashSize = 8
278
280
)
279
281
280
282
func newFileStore (fcfg FileStoreConfig , cfg StreamConfig ) (* fileStore , error ) {
@@ -971,6 +973,10 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) {
971
973
func (mb * msgBlock ) rebuildStateLocked () (* LostStreamData , error ) {
972
974
startLastSeq := mb .last .seq
973
975
976
+ // Remove the .fss file and clear any cache we have set.
977
+ mb .clearCacheAndOffset ()
978
+ mb .removePerSubjectInfoLocked ()
979
+
974
980
buf , err := mb .loadBlock (nil )
975
981
if err != nil || len (buf ) == 0 {
976
982
var ld * LostStreamData
@@ -996,9 +1002,6 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
996
1002
mb .last .seq , mb .last .ts = 0 , 0
997
1003
firstNeedsSet := true
998
1004
999
- // Remove the .fss file from disk.
1000
- mb .removePerSubjectInfoLocked ()
1001
-
1002
1005
// Check if we need to decrypt.
1003
1006
if mb .bek != nil && len (buf ) > 0 {
1004
1007
// Recreate to reset counter.
@@ -1070,12 +1073,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
1070
1073
rl &^= hbit
1071
1074
dlen := int (rl ) - msgHdrSize
1072
1075
// Do some quick sanity checks here.
1073
- if dlen < 0 || int (slen ) > (dlen - 8 ) || dlen > int (rl ) || rl > rlBadThresh {
1074
- truncate (index )
1075
- return gatherLost (lbuf - index ), errBadMsg
1076
- }
1077
-
1078
- if index + rl > lbuf {
1076
+ if dlen < 0 || int (slen ) > (dlen - recordHashSize ) || dlen > int (rl ) || index + rl > lbuf || rl > rlBadThresh {
1079
1077
truncate (index )
1080
1078
return gatherLost (lbuf - index ), errBadMsg
1081
1079
}
@@ -1091,15 +1089,17 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
1091
1089
addToDmap (seq )
1092
1090
}
1093
1091
index += rl
1094
- mb .last .seq = seq
1095
- mb .last .ts = ts
1092
+ if seq >= mb .first .seq {
1093
+ mb .last .seq = seq
1094
+ mb .last .ts = ts
1095
+ }
1096
1096
continue
1097
1097
}
1098
1098
1099
1099
// This is for when we have index info that adjusts for deleted messages
1100
1100
// at the head. So the first.seq will be already set here. If this is larger
1101
1101
// replace what we have with this seq.
1102
- if firstNeedsSet && seq > mb .first .seq {
1102
+ if firstNeedsSet && seq >= mb .first .seq {
1103
1103
firstNeedsSet , mb .first .seq , mb .first .ts = false , seq , ts
1104
1104
}
1105
1105
@@ -1119,12 +1119,12 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
1119
1119
hh .Write (hdr [4 :20 ])
1120
1120
hh .Write (data [:slen ])
1121
1121
if hasHeaders {
1122
- hh .Write (data [slen + 4 : dlen - 8 ])
1122
+ hh .Write (data [slen + 4 : dlen - recordHashSize ])
1123
1123
} else {
1124
- hh .Write (data [slen : dlen - 8 ])
1124
+ hh .Write (data [slen : dlen - recordHashSize ])
1125
1125
}
1126
1126
checksum := hh .Sum (nil )
1127
- if ! bytes .Equal (checksum , data [len (data )- 8 :]) {
1127
+ if ! bytes .Equal (checksum , data [len (data )- recordHashSize :]) {
1128
1128
truncate (index )
1129
1129
return gatherLost (lbuf - index ), errBadMsg
1130
1130
}
@@ -1165,6 +1165,11 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
1165
1165
mb .last .seq = mb .first .seq - 1
1166
1166
}
1167
1167
1168
+ // Update our fss file if needed.
1169
+ if len (mb .fss ) > 0 {
1170
+ mb .writePerSubjectInfo ()
1171
+ }
1172
+
1168
1173
return nil , nil
1169
1174
}
1170
1175
@@ -2598,14 +2603,42 @@ func (fs *fileStore) enforceMsgPerSubjectLimit() {
2598
2603
fs .scb = nil
2599
2604
defer func () { fs .scb = cb }()
2600
2605
2606
+ var numMsgs uint64
2607
+
2601
2608
// collect all that are not correct.
2602
2609
needAttention := make (map [string ]* psi )
2603
2610
for subj , psi := range fs .psim {
2611
+ numMsgs += psi .total
2604
2612
if psi .total > maxMsgsPer {
2605
2613
needAttention [subj ] = psi
2606
2614
}
2607
2615
}
2608
2616
2617
+ // We had an issue with a use case where psim (and hence fss) were correct but idx was not and was not properly being caught.
2618
+ // So do a quick sanity check here. If we detect a skew do a rebuild then re-check.
2619
+ if numMsgs != fs .state .Msgs {
2620
+ // Clear any global subject state.
2621
+ fs .psim = make (map [string ]* psi )
2622
+ for _ , mb := range fs .blks {
2623
+ mb .removeIndexFile ()
2624
+ ld , err := mb .rebuildState ()
2625
+ mb .writeIndexInfo ()
2626
+ if err != nil && ld != nil {
2627
+ fs .addLostData (ld )
2628
+ }
2629
+ fs .populateGlobalPerSubjectInfo (mb )
2630
+ }
2631
+ // Rebuild fs state too.
2632
+ fs .rebuildStateLocked (nil )
2633
+ // Need to redo blocks that need attention.
2634
+ needAttention = make (map [string ]* psi )
2635
+ for subj , psi := range fs .psim {
2636
+ if psi .total > maxMsgsPer {
2637
+ needAttention [subj ] = psi
2638
+ }
2639
+ }
2640
+ }
2641
+
2609
2642
// Collect all the msgBlks we alter.
2610
2643
blks := make (map [* msgBlock ]struct {})
2611
2644
@@ -3050,8 +3083,7 @@ func (mb *msgBlock) compact() {
3050
3083
return
3051
3084
}
3052
3085
3053
- // Close cache and index file and wipe delete map, then rebuild.
3054
- mb .clearCacheAndOffset ()
3086
+ // Remove index file and wipe delete map, then rebuild.
3055
3087
mb .removeIndexFileLocked ()
3056
3088
mb .deleteDmap ()
3057
3089
mb .rebuildStateLocked ()
@@ -3077,6 +3109,11 @@ func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) {
3077
3109
bi := mb .cache .idx [slot ]
3078
3110
ri , hashChecked := (bi &^ hbit ), (bi & hbit ) != 0
3079
3111
3112
+ // If this is a deleted slot return here.
3113
+ if bi == dbit {
3114
+ return 0 , 0 , false , errDeletedMsg
3115
+ }
3116
+
3080
3117
// Determine record length
3081
3118
var rl uint32
3082
3119
if len (mb .cache .idx ) > slot + 1 {
@@ -4022,7 +4059,7 @@ func (fs *fileStore) selectMsgBlockForStart(minTime time.Time) *msgBlock {
4022
4059
func (mb * msgBlock ) indexCacheBuf (buf []byte ) error {
4023
4060
var le = binary .LittleEndian
4024
4061
4025
- var fseq uint64
4062
+ var fseq , pseq uint64
4026
4063
var idx []uint32
4027
4064
var index uint32
4028
4065
@@ -4055,23 +4092,39 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
4055
4092
dlen := int (rl ) - msgHdrSize
4056
4093
4057
4094
// Do some quick sanity checks here.
4058
- if dlen < 0 || int (slen ) > dlen || dlen > int (rl ) || index + rl > lbuf || rl > 32 * 1024 * 1024 {
4095
+ if dlen < 0 || int (slen ) > ( dlen - recordHashSize ) || dlen > int (rl ) || index + rl > lbuf || rl > rlBadThresh {
4059
4096
// This means something is off.
4060
4097
// TODO(dlc) - Add into bad list?
4061
4098
return errCorruptState
4062
4099
}
4063
4100
4064
4101
// Clear erase bit.
4065
4102
seq = seq &^ ebit
4066
- // Adjust if we guessed wrong.
4067
- if seq != 0 && seq < fseq {
4068
- fseq = seq
4069
- }
4103
+
4070
4104
// We defer checksum checks to individual msg cache lookups to amortorize costs and
4071
4105
// not introduce latency for first message from a newly loaded block.
4072
- idx = append (idx , index )
4073
- mb .cache .lrl = uint32 (rl )
4074
- index += mb .cache .lrl
4106
+ if seq >= mb .first .seq {
4107
+ // Track that we do not have holes.
4108
+ // Not expected but did see it in the field.
4109
+ if pseq > 0 && seq != pseq + 1 {
4110
+ if mb .dmap == nil {
4111
+ mb .dmap = make (map [uint64 ]struct {})
4112
+ }
4113
+ for dseq := pseq + 1 ; dseq < seq ; dseq ++ {
4114
+ idx = append (idx , dbit )
4115
+ mb .dmap [dseq ] = struct {}{}
4116
+ }
4117
+ }
4118
+ pseq = seq
4119
+
4120
+ idx = append (idx , index )
4121
+ mb .cache .lrl = uint32 (rl )
4122
+ // Adjust if we guessed wrong.
4123
+ if seq != 0 && seq < fseq {
4124
+ fseq = seq
4125
+ }
4126
+ }
4127
+ index += rl
4075
4128
}
4076
4129
mb .cache .buf = buf
4077
4130
mb .cache .idx = idx
@@ -4407,6 +4460,9 @@ const hbit = 1 << 31
4407
4460
// Used for marking erased messages sequences.
4408
4461
const ebit = 1 << 63
4409
4462
4463
+ // Used to mark a bad index as deleted.
4464
+ const dbit = 1 << 30
4465
+
4410
4466
// Will do a lookup from cache.
4411
4467
// Lock should be held.
4412
4468
func (mb * msgBlock ) cacheLookup (seq uint64 , sm * StoreMsg ) (* StoreMsg , error ) {
@@ -4417,6 +4473,7 @@ func (mb *msgBlock) cacheLookup(seq uint64, sm *StoreMsg) (*StoreMsg, error) {
4417
4473
// If we have a delete map check it.
4418
4474
if mb .dmap != nil {
4419
4475
if _ , ok := mb .dmap [seq ]; ok {
4476
+ mb .llts = time .Now ().UnixNano ()
4420
4477
return nil , errDeletedMsg
4421
4478
}
4422
4479
}
@@ -4559,9 +4616,9 @@ func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*Store
4559
4616
hh .Write (hdr [4 :20 ])
4560
4617
hh .Write (data [:slen ])
4561
4618
if hasHeaders {
4562
- hh .Write (data [slen + 4 : dlen - 8 ])
4619
+ hh .Write (data [slen + 4 : dlen - recordHashSize ])
4563
4620
} else {
4564
- hh .Write (data [slen : dlen - 8 ])
4621
+ hh .Write (data [slen : dlen - recordHashSize ])
4565
4622
}
4566
4623
if ! bytes .Equal (hh .Sum (nil ), data [len (data )- 8 :]) {
4567
4624
return nil , errBadMsg
0 commit comments