Skip to content

Commit 2f3b406

Browse files
Optimise enforceMsgPerSubjectLimit (#6871)
When all of the state was lost and we need to rebuild and enforce a max messages per subject limit, then the `enforceMsgPerSubjectLimit` function could linearly scan multiple times and become very expensive if there are lots of subjects. This refactors the function to use new subject tree intersection logic to more quickly work out if there is any overlap between the subjects needing attention and the subjects in a given block, meaning we only need to scan the store once. Signed-off-by: Neil Twigg <neil@nats.io>
2 parents 516fda6 + 751fbc8 commit 2f3b406

File tree

4 files changed

+165
-36
lines changed

4 files changed

+165
-36
lines changed

server/filestore.go

Lines changed: 59 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -4492,11 +4492,18 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {
44924492
var numMsgs uint64
44934493

44944494
// collect all that are not correct.
4495-
needAttention := make(map[string]*psi)
4495+
needAttention := stree.NewSubjectTree[uint64]()
4496+
fblk, lblk := uint32(math.MaxUint32), uint32(0)
44964497
fs.psim.IterFast(func(subj []byte, psi *psi) bool {
44974498
numMsgs += psi.total
44984499
if psi.total > maxMsgsPer {
4499-
needAttention[string(subj)] = psi
4500+
needAttention.Insert(subj, psi.total)
4501+
if psi.fblk < fblk {
4502+
fblk = psi.fblk
4503+
}
4504+
if psi.lblk > lblk {
4505+
lblk = psi.lblk
4506+
}
45004507
}
45014508
return true
45024509
})
@@ -4517,55 +4524,71 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {
45174524
// Rebuild fs state too.
45184525
fs.rebuildStateLocked(nil)
45194526
// Need to redo blocks that need attention.
4520-
needAttention = make(map[string]*psi)
4527+
needAttention.Empty()
4528+
fblk, lblk = uint32(math.MaxUint32), uint32(0)
45214529
fs.psim.IterFast(func(subj []byte, psi *psi) bool {
45224530
if psi.total > maxMsgsPer {
4523-
needAttention[string(subj)] = psi
4531+
needAttention.Insert(subj, psi.total)
4532+
if psi.fblk < fblk {
4533+
fblk = psi.fblk
4534+
}
4535+
if psi.lblk > lblk {
4536+
lblk = psi.lblk
4537+
}
45244538
}
45254539
return true
45264540
})
45274541
}
45284542

4543+
// If nothing to do then stop.
4544+
if fblk == math.MaxUint32 {
4545+
return
4546+
}
4547+
45294548
// Collect all the msgBlks we alter.
45304549
blks := make(map[*msgBlock]struct{})
45314550

45324551
// For re-use below.
45334552
var sm StoreMsg
4534-
4535-
// Walk all subjects that need attention here.
4536-
for subj, info := range needAttention {
4537-
total, start, stop := info.total, info.fblk, info.lblk
4538-
4539-
for i := start; i <= stop; i++ {
4540-
mb := fs.bim[i]
4541-
if mb == nil {
4542-
continue
4543-
}
4544-
// Grab the ss entry for this subject in case sparse.
4545-
mb.mu.Lock()
4546-
mb.ensurePerSubjectInfoLoaded()
4547-
ss, ok := mb.fss.Find(stringToBytes(subj))
4548-
if ok && ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) {
4549-
mb.recalculateForSubj(subj, ss)
4553+
var fss *stree.SubjectTree[*SimpleState]
4554+
for i := fblk; i <= lblk; i++ {
4555+
mb := fs.bim[i]
4556+
if mb == nil {
4557+
continue
4558+
}
4559+
mb.mu.Lock()
4560+
mb.ensurePerSubjectInfoLoaded()
4561+
// It isn't safe to intersect mb.fss directly, because removeMsgViaLimits modifies it
4562+
// during the iteration, which can cause us to miss keys. We won't copy the entire
4563+
// SimpleState structs though but rather just take pointers for speed.
4564+
fss = fss.Empty()
4565+
mb.fss.IterFast(func(subject []byte, val *SimpleState) bool {
4566+
fss.Insert(subject, val)
4567+
return true
4568+
})
4569+
mb.mu.Unlock()
4570+
stree.LazyIntersect(needAttention, fss, func(subj []byte, total *uint64, ssptr **SimpleState) {
4571+
if ssptr == nil || total == nil {
4572+
return
45504573
}
4551-
mb.mu.Unlock()
4552-
if ss == nil {
4553-
continue
4574+
ss := *ssptr
4575+
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
4576+
mb.mu.Lock()
4577+
mb.recalculateForSubj(bytesToString(subj), ss)
4578+
mb.mu.Unlock()
45544579
}
4555-
for seq := ss.First; seq <= ss.Last && total > maxMsgsPer; {
4556-
m, _, err := mb.firstMatching(subj, false, seq, &sm)
4557-
if err == nil {
4558-
seq = m.seq + 1
4559-
if removed, _ := fs.removeMsgViaLimits(m.seq); removed {
4560-
total--
4561-
blks[mb] = struct{}{}
4562-
}
4563-
} else {
4564-
// On error just do single increment.
4565-
seq++
4580+
for first := ss.First; *total > maxMsgsPer && first <= ss.Last; {
4581+
m, _, err := mb.firstMatching(bytesToString(subj), false, first, &sm)
4582+
if err != nil {
4583+
break
4584+
}
4585+
first = m.seq + 1
4586+
if removed, _ := fs.removeMsgViaLimits(m.seq); removed {
4587+
blks[mb] = struct{}{}
4588+
*total--
45664589
}
45674590
}
4568-
}
4591+
})
45694592
}
45704593

45714594
// Expire the cache if we can.
@@ -8946,7 +8969,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error {
89468969
}
89478970

89488971
// Create new one regardless.
8949-
mb.fss = stree.NewSubjectTree[SimpleState]()
8972+
mb.fss = mb.fss.Empty()
89508973

89518974
var smv StoreMsg
89528975
fseq, lseq := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq)

server/jetstream_cluster_long_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1143,3 +1143,50 @@ func TestLongClusterJetStreamRestartThenScaleStreamReplicas(t *testing.T) {
11431143
cancel()
11441144
wg.Wait()
11451145
}
1146+
1147+
func TestLongFileStoreEnforceMsgPerSubjectLimit(t *testing.T) {
1148+
td := t.TempDir()
1149+
fs, err := newFileStore(
1150+
FileStoreConfig{StoreDir: td, BlockSize: 1024},
1151+
StreamConfig{
1152+
Name: "zzz", Subjects: []string{"test.>"}, Storage: FileStorage,
1153+
},
1154+
)
1155+
require_NoError(t, err)
1156+
defer fs.Stop()
1157+
1158+
t.Logf("Starting publishes")
1159+
for i := 0; i < 100_000; i++ {
1160+
_, _, err := fs.StoreMsg(fmt.Sprintf("test.%06d", i), nil, nil, 0)
1161+
require_NoError(t, err)
1162+
}
1163+
// Now update some of them. Leave a bit of a mess with some big gaps.
1164+
for i := 0; i < 5_000_000; i++ {
1165+
n := rand.Int31n(100_000)
1166+
if n < 5000 {
1167+
continue
1168+
}
1169+
_, _, err := fs.StoreMsg(fmt.Sprintf("test.%06d", n), nil, nil, 0)
1170+
require_NoError(t, err)
1171+
}
1172+
t.Logf("Publish complete")
1173+
1174+
require_NoError(t, fs.Stop())
1175+
fs, err = newFileStore(
1176+
FileStoreConfig{StoreDir: td, BlockSize: 1024},
1177+
StreamConfig{
1178+
Name: "zzz", Subjects: []string{"test.>"}, Storage: FileStorage,
1179+
MaxMsgsPer: 1,
1180+
},
1181+
)
1182+
require_NoError(t, err)
1183+
defer fs.Stop()
1184+
1185+
// Mangle the filestore state and then see how long it takes to enforce
1186+
// the per-subject limit.
1187+
fs.state.Msgs++
1188+
start := time.Now()
1189+
fs.enforceMsgPerSubjectLimit(false)
1190+
require_LessThan(t, time.Since(start), time.Minute)
1191+
t.Logf("Took %s", time.Since(start))
1192+
}

server/stree/stree.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,3 +418,29 @@ func (t *SubjectTree[T]) iter(n node, pre []byte, ordered bool, cb func(subject
418418
}
419419
return true
420420
}
421+
422+
// LazyIntersect iterates the smaller of the two provided subject trees and
423+
// looks for matching entries in the other. It is lazy in that it does not
424+
// aggressively optimize against repeated walks, but is considerably faster
425+
// in most cases than intersecting against a potentially large sublist.
426+
func LazyIntersect[TL, TR any](tl *SubjectTree[TL], tr *SubjectTree[TR], cb func([]byte, *TL, *TR)) {
427+
if tl.root == nil || tr.root == nil {
428+
return
429+
}
430+
// Iterate over the smaller tree to reduce the number of rounds.
431+
if tl.Size() <= tr.Size() {
432+
tl.IterFast(func(key []byte, v1 *TL) bool {
433+
if v2, ok := tr.Find(key); ok {
434+
cb(key, v1, v2)
435+
}
436+
return true
437+
})
438+
} else {
439+
tr.IterFast(func(key []byte, v2 *TR) bool {
440+
if v1, ok := tl.Find(key); ok {
441+
cb(key, v1, v2)
442+
}
443+
return true
444+
})
445+
}
446+
}

server/stree/stree_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -923,3 +923,36 @@ func TestSubjectTreeMatchHasFWCNoPanic(t *testing.T) {
923923
st.Insert(subj, 1)
924924
st.Match([]byte("."), func(subject []byte, val *int) {})
925925
}
926+
927+
func TestSubjectTreeLazyIntersect(t *testing.T) {
928+
st1 := NewSubjectTree[int]()
929+
st2 := NewSubjectTree[int]()
930+
931+
// Should cause an intersection.
932+
st1.Insert([]byte("foo.bar"), 1)
933+
st2.Insert([]byte("foo.bar"), 1)
934+
935+
// Should cause an intersection.
936+
st1.Insert([]byte("foo.bar.baz.qux"), 1)
937+
st2.Insert([]byte("foo.bar.baz.qux"), 1)
938+
939+
// Should not cause any intersections.
940+
st1.Insert([]byte("bar"), 1)
941+
st2.Insert([]byte("baz"), 1)
942+
st1.Insert([]byte("a.b.c"), 1)
943+
st2.Insert([]byte("a.b.d"), 1)
944+
st1.Insert([]byte("a.b.ee"), 1)
945+
st2.Insert([]byte("a.b.e"), 1)
946+
st1.Insert([]byte("bb.c.d"), 1)
947+
st2.Insert([]byte("b.c.d"), 1)
948+
st2.Insert([]byte("foo.bar.baz.qux.alice"), 1)
949+
st2.Insert([]byte("foo.bar.baz.qux.bob"), 1)
950+
951+
intersected := map[string]int{}
952+
LazyIntersect(st1, st2, func(key []byte, val1, val2 *int) {
953+
intersected[string(key)]++
954+
})
955+
require_Equal(t, len(intersected), 2)
956+
require_Equal(t, intersected["foo.bar"], 1)
957+
require_Equal(t, intersected["foo.bar.baz.qux"], 1)
958+
}

0 commit comments

Comments
 (0)