Skip to content

Commit 51025c6

Browse files
authored
CBG-4220 [3.2.0 backport] Pending unused sequences shouldn't update high cache sequence (#7102)
If unused sequences ranges are pushed to pendingLogs, they shouldn't increment the channel cache's highSequenceCached until they are processed.
1 parent c85d3a7 commit 51025c6

File tree

3 files changed

+10
-10
lines changed

3 files changed

+10
-10
lines changed

db/change_cache.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,6 @@ func (c *changeCache) releaseUnusedSequence(ctx context.Context, sequence uint64
576576
} else {
577577
changedChannels.Add(unusedSeq)
578578
}
579-
c.channelCache.AddUnusedSequence(change)
580579
if c.notifyChange != nil && len(changedChannels) > 0 {
581580
c.notifyChange(ctx, changedChannels)
582581
}
@@ -599,7 +598,6 @@ func (c *changeCache) releaseUnusedSequenceRange(ctx context.Context, fromSequen
599598
}
600599
changedChannels := c.processEntry(ctx, change)
601600
allChangedChannels = allChangedChannels.Update(changedChannels)
602-
c.channelCache.AddUnusedSequence(change)
603601
if c.notifyChange != nil {
604602
c.notifyChange(ctx, allChangedChannels)
605603
}
@@ -609,9 +607,6 @@ func (c *changeCache) releaseUnusedSequenceRange(ctx context.Context, fromSequen
609607
// push unused range to either pending or skipped lists based on current state of the change cache
610608
allChangedChannels = c.processUnusedRange(ctx, fromSequence, toSequence, allChangedChannels, timeReceived)
611609

612-
// update high seq cached
613-
c.channelCache.AddUnusedSequence(&LogEntry{Sequence: toSequence})
614-
615610
if c.notifyChange != nil {
616611
c.notifyChange(ctx, allChangedChannels)
617612
}
@@ -804,8 +799,9 @@ func (c *changeCache) _addToCache(ctx context.Context, change *LogEntry) []chann
804799
}
805800
delete(c.receivedSeqs, change.Sequence)
806801

807-
// If unused sequence or principal, we're done after updating sequence
802+
// If unused sequence, notify the cache and return
808803
if change.DocID == "" {
804+
c.channelCache.AddUnusedSequence(change)
809805
return nil
810806
}
811807

db/change_cache_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2430,7 +2430,7 @@ func TestReleasedSequenceRangeHandlingEverythingPending(t *testing.T) {
24302430
assert.Equal(c, int64(1), dbContext.DbStats.CacheStats.PendingSeqLen.Value())
24312431
assert.Equal(c, uint64(2), testChangeCache.nextSequence)
24322432
dbContext.UpdateCalculatedStats(ctx)
2433-
assert.Equal(c, int64(25), dbContext.DbStats.CacheStats.HighSeqCached.Value())
2433+
assert.Equal(c, int64(1), dbContext.DbStats.CacheStats.HighSeqCached.Value())
24342434
}, time.Second*10, time.Millisecond*100)
24352435
}
24362436

@@ -2536,7 +2536,7 @@ func TestReleasedSequenceRangeHandlingEverythingPendingLowPendingCapacity(t *tes
25362536
defer testChangeCache.Stop(ctx)
25372537
require.NoError(t, err)
25382538

2539-
// process unused sequence range
2539+
// process unused sequence range, will be sent to pending. Triggers seq 1 being sent to skipped
25402540
testChangeCache.releaseUnusedSequenceRange(ctx, 2, 25, time.Now())
25412541

25422542
require.EventuallyWithT(t, func(c *assert.CollectT) {
@@ -2647,7 +2647,7 @@ func TestReleasedSequenceRangeHandlingSingleSequence(t *testing.T) {
26472647
assert.Equal(c, int64(1), dbContext.DbStats.CacheStats.PendingSeqLen.Value())
26482648
assert.Equal(c, uint64(1), testChangeCache.nextSequence)
26492649
dbContext.UpdateCalculatedStats(ctx)
2650-
assert.Equal(c, int64(2), dbContext.DbStats.CacheStats.HighSeqCached.Value())
2650+
assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.HighSeqCached.Value())
26512651
}, time.Second*10, time.Millisecond*100)
26522652

26532653
// process change that should overload pending and push sequence 1 to skipped

db/channel_cache.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,11 @@ func (c *channelCacheImpl) AddPrincipal(change *LogEntry) {
188188

189189
// Add unused Sequence notifies the cache of an unused sequence update. Updates the cache's high sequence
190190
func (c *channelCacheImpl) AddUnusedSequence(change *LogEntry) {
191-
c.updateHighCacheSequence(change.Sequence)
191+
if change.EndSequence > 0 {
192+
c.updateHighCacheSequence(change.EndSequence)
193+
} else {
194+
c.updateHighCacheSequence(change.Sequence)
195+
}
192196
}
193197

194198
// Adds an entry to the appropriate channels' caches, returning the affected channels. lateSequence

0 commit comments

Comments
 (0)