Skip to content

Commit 4f53ee2

Browse files
authored
[3.1.9 backport] CBG-4016 Refresh sequence allocator before incr during nextSequenceGreaterThan (#6960)
* CBG-4015 Refresh sequence allocator before incr during nextSequenceGreaterThan When nextSequenceGreaterThan requires a sequence larger than what’s already present in the allocator’s batch size, fetch the current _sync:seq before computing the require increment size, to account for variable allocation rates by other allocators. * Logging fixes * Test enhancements from PR review
1 parent 6d852b1 commit 4f53ee2

File tree

2 files changed

+196
-19
lines changed

2 files changed

+196
-19
lines changed

db/sequence_allocator.go

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,18 @@ func (s *sequenceAllocator) nextSequence(ctx context.Context) (sequence uint64,
182182
return sequence, nil
183183
}
184184

185+
// _releaseCurrentBatch releases any unused sequences currently held by the allocator
186+
func (s *sequenceAllocator) _releaseCurrentBatch(ctx context.Context) (numReleased uint64, err error) {
187+
if s.max > s.last {
188+
numReleased, err = s.releaseSequenceRange(ctx, s.last+1, s.max)
189+
if err != nil {
190+
return 0, err
191+
}
192+
s.last = s.max
193+
}
194+
return numReleased, nil
195+
}
196+
185197
// nextSequenceGreaterThan increments _sync:seq such that it's greater than existingSequence + s.sequenceBatchSize
186198
// In the case where our local s.max < _sync:seq (another node has incremented _sync:seq), we may be releasing
187199
// sequences greater than existingSequence, but we will only ever release sequences allocated by this node's incr operation
@@ -221,19 +233,45 @@ func (s *sequenceAllocator) nextSequenceGreaterThan(ctx context.Context, existin
221233

222234
}
223235

224-
// If the target sequence is greater than the highest in our batch (s.max), we want to:
225-
// (a) Reserve n sequences past _sync:seq, where n = existingSequence - s.max. It's ok if the resulting sequence exceeds targetSequence (if other nodes have allocated sequences and
226-
// updated _sync:seq since we last updated s.max.), then
236+
// At this point we need to allocate a sequence that's larger than what's in our current batch, so we first need to release the current batch while holding the mutex.
237+
var numReleasedBatch uint64
238+
numReleasedBatch, err = s._releaseCurrentBatch(ctx)
239+
if err != nil {
240+
base.InfofCtx(ctx, base.KeyCache, "Unable to release current batch during nextSequenceGreaterThan for existing sequence %d. Will be handled by skipped sequence handling. %v", existingSequence, err)
241+
}
242+
releasedSequenceCount += numReleasedBatch
243+
244+
syncSeq, err := s.getSequence()
245+
if err != nil {
246+
base.WarnfCtx(ctx, "Unable to fetch current sequence during nextSequenceGreaterThan for existing sequence %d. Error:%v", existingSequence, err)
247+
s.mutex.Unlock()
248+
return 0, 0, err
249+
}
250+
251+
// If the target sequence is less than the current _sync:seq, allocate as normal using _nextSequence
252+
if syncSeq >= targetSequence {
253+
sequence, sequencesReserved, err := s._nextSequence(ctx)
254+
s.mutex.Unlock()
255+
if err != nil {
256+
return 0, 0, err
257+
}
258+
if sequencesReserved {
259+
s.reserveNotify <- struct{}{}
260+
}
261+
s.dbStats.SequenceAssignedCount.Add(1)
262+
return sequence, releasedSequenceCount, nil
263+
}
264+
265+
// If the target sequence is greater than the current _sync:seq, we want to:
266+
// (a) Reserve n sequences past _sync:seq, where n = existingSequence - syncSeq. It's ok if the resulting sequence exceeds targetSequence (if other nodes have allocated sequences and
267+
// updated _sync:seq since we last updated s.max.)
227268
// (b) Allocate a standard batch of sequences, and assign a sequence from that batch in the usual way.
228269
// (c) Release any previously allocated sequences (s.last to s.max)
229270
// (d) Release the reserved sequences from part (a)
230271
// We can perform (a) and (b) as a single increment operation, but (c) and (d) aren't necessarily contiguous blocks and must be released
231272
// separately
232273

233-
prevAllocReleaseFrom := s.last + 1
234-
prevAllocReleaseTo := s.max
235-
236-
numberToRelease := existingSequence - s.max
274+
numberToRelease := existingSequence - syncSeq
237275
numberToAllocate := s.sequenceBatchSize
238276
allocatedToSeq, err := s.incrementSequence(numberToRelease + numberToAllocate)
239277
if err != nil {
@@ -253,12 +291,6 @@ func (s *sequenceAllocator) nextSequenceGreaterThan(ctx context.Context, existin
253291
s.dbStats.SequenceReservedCount.Add(int64(numberToRelease + numberToAllocate))
254292
s.dbStats.SequenceAssignedCount.Add(1)
255293

256-
// Release previously allocated sequences (c), if any
257-
released, err := s.releaseSequenceRange(ctx, prevAllocReleaseFrom, prevAllocReleaseTo)
258-
if err != nil {
259-
base.WarnfCtx(ctx, "Error returned when releasing sequence range [%d-%d] for previously allocated sequences. Will be handled by skipped sequence handling. Error:%v", prevAllocReleaseFrom, prevAllocReleaseTo, err)
260-
}
261-
releasedSequenceCount += released
262294
// Release the newly allocated sequences that were used to catch up to existingSequence (d)
263295
if numberToRelease > 0 {
264296
releaseTo := allocatedToSeq - numberToAllocate

db/sequence_allocator_test.go

Lines changed: 151 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,12 @@ licenses/APL2.txt.
1111
package db
1212

1313
import (
14+
"context"
15+
"fmt"
16+
"log"
17+
"math/rand"
1418
"sync"
19+
"sync/atomic"
1520
"testing"
1621
"time"
1722

@@ -378,14 +383,154 @@ func TestNextSequenceGreaterThanMultiNode(t *testing.T) {
378383
assertNewAllocatorStats(t, dbStatsB, 1, 10, 2, 4)
379384

380385
// calling nextSequenceGreaterThan(15) on A will increment _sync:seq by 5 on it's previously allocated sequence (10).
381-
// Since node B has already updated _sync:seq to 20, will result in:
386+
// Since node B has already updated _sync:seq to 20, calling nextSequenceGreaterThan(15) on A will result in:
382387
// node A releasing sequences 2-10 from it's existing buffer
383-
// node A allocating and releasing sequences 21-24
384-
// node A adding sequences 25-35 to its buffer, and assigning 25 to the current request
388+
// node A adding sequences 21-30 to its buffer, and assigning 21 to the current request
385389
nextSequence, releasedSequenceCount, err = a.nextSequenceGreaterThan(ctx, 15)
386390
assert.NoError(t, err)
387-
assert.Equal(t, uint64(26), nextSequence)
388-
assert.Equal(t, 14, int(releasedSequenceCount))
389-
assertNewAllocatorStats(t, dbStatsA, 2, 25, 2, 14)
391+
assert.Equal(t, uint64(21), nextSequence)
392+
assert.Equal(t, 9, int(releasedSequenceCount))
393+
assertNewAllocatorStats(t, dbStatsA, 2, 20, 2, 9)
390394

391395
}
396+
397+
// TestVariableRateAllocators simulates the following scenario:
398+
// - import nodes have high sequence allocation rate
399+
// - client-facing nodes have low sequence allocation rate
400+
// - documents are imported, then the same documents are immediately updated by clients
401+
// (including sequence validation triggering nextSequenceGreaterThan)
402+
//
403+
// Ensures we don't release more sequences than would be expected based on allocator batch size
404+
func TestVariableRateAllocators(t *testing.T) {
405+
ctx := base.TestCtx(t)
406+
bucket := base.GetTestBucket(t)
407+
defer bucket.Close(ctx)
408+
409+
var expectedAllocations uint64
410+
411+
dataStore := bucket.GetSingleDataStore()
412+
stats, err := base.NewSyncGatewayStats()
413+
require.NoError(t, err)
414+
415+
importStats, err := stats.NewDBStats("import", false, false, false, nil, nil)
416+
require.NoError(t, err)
417+
418+
importFeedAllocator, err := newSequenceAllocator(ctx, dataStore, importStats.DatabaseStats, base.DefaultMetadataKeys)
419+
require.NoError(t, err)
420+
421+
// All test allocators are stopped when allocatorCtx is closed
422+
allocatorCtx, cancelFunc := context.WithCancel(ctx)
423+
424+
// Start import node allocator, performing 10000 allocations/second.
425+
var allocatorWg sync.WaitGroup
426+
allocatorWg.Add(1)
427+
go func() {
428+
count := runAllocator(allocatorCtx, importFeedAllocator, 100*time.Microsecond) // 10000 writes/second
429+
atomic.AddUint64(&expectedAllocations, count)
430+
allocatorWg.Done()
431+
}()
432+
433+
// Start multiple client node allocators, performing 100 allocations/second
434+
clientAllocators := make([]*sequenceAllocator, 0)
435+
clientAllocatorCount := 10
436+
for i := 0; i <= clientAllocatorCount; i++ {
437+
clientStats, err := stats.NewDBStats(fmt.Sprintf("client%d", i), false, false, false, nil, nil)
438+
require.NoError(t, err)
439+
clientAllocator, err := newSequenceAllocator(ctx, dataStore, clientStats.DatabaseStats, base.DefaultMetadataKeys)
440+
require.NoError(t, err)
441+
clientAllocators = append(clientAllocators, clientAllocator)
442+
allocatorWg.Add(1)
443+
go func() {
444+
count := runAllocator(allocatorCtx, clientAllocator, 10*time.Millisecond) // 100 writes/second
445+
atomic.AddUint64(&expectedAllocations, count)
446+
allocatorWg.Done()
447+
}()
448+
}
449+
450+
// Wait for allocators to get up to maximum batch size
451+
time.Sleep(500 * time.Millisecond)
452+
documentCount := 10
453+
var updateWg sync.WaitGroup
454+
updateWg.Add(documentCount)
455+
for i := 0; i < documentCount; i++ {
456+
go func() {
457+
_ = multiNodeUpdate(t, ctx, importFeedAllocator, clientAllocators, 5, 10*time.Millisecond)
458+
updateWg.Done()
459+
atomic.AddUint64(&expectedAllocations, 6)
460+
}()
461+
}
462+
463+
updateWg.Wait()
464+
465+
// Stop background allocation goroutines, wait for them to close
466+
cancelFunc()
467+
allocatorWg.Wait()
468+
469+
log.Printf("expectedSequence (num allocations):%v", atomic.LoadUint64(&expectedAllocations))
470+
471+
importFeedAllocator.Stop(ctx)
472+
numAssigned := importFeedAllocator.dbStats.SequenceAssignedCount.Value()
473+
numReleased := importFeedAllocator.dbStats.SequenceReleasedCount.Value()
474+
for _, allocator := range clientAllocators {
475+
allocator.Stop(ctx)
476+
numAssigned += allocator.dbStats.SequenceAssignedCount.Value()
477+
clientSequencesReleased := allocator.dbStats.SequenceReleasedCount.Value()
478+
numReleased += clientSequencesReleased
479+
480+
}
481+
482+
log.Printf("Total sequences released + assigned: %v", numReleased+numAssigned)
483+
actualSequence, err := importFeedAllocator.getSequence()
484+
log.Printf("actual sequence (getSequence): %v", actualSequence)
485+
require.NoError(t, err)
486+
}
487+
488+
// multiNodeUpdate obtains an initial sequence from an import allocator (import node), then performs repeated updates to the doc using random pool of iterators (random SG node).
489+
// Performs sequenceGreaterThan, then ensures that allocator doesn't release more than the sequence batch size
490+
func multiNodeUpdate(t *testing.T, ctx context.Context, importAllocator *sequenceAllocator, clientAllocators []*sequenceAllocator, updateCount int, interval time.Duration) (releasedCount uint64) {
491+
currentSequence, _ := importAllocator.nextSequence(ctx)
492+
493+
for i := 0; i < updateCount; i++ {
494+
allocatorIndex := rand.Intn(len(clientAllocators))
495+
clientAllocator := clientAllocators[allocatorIndex]
496+
nextSequence, err := clientAllocator.nextSequence(ctx)
497+
require.NoError(t, err, "nextSequence error: %v", err)
498+
if nextSequence < currentSequence {
499+
prevNext := nextSequence
500+
var numReleased uint64
501+
nextSequence, numReleased, err = clientAllocator.nextSequenceGreaterThan(ctx, currentSequence)
502+
require.NoError(t, err, "nextSequenceGreaterThan error: %v", err)
503+
log.Printf("allocator %d released %d sequences because next < current (%d < %d)", numReleased, allocatorIndex, prevNext, currentSequence)
504+
// At most clientAllocator should only need to release the current batch
505+
assert.LessOrEqual(t, numReleased, getClientSequenceBatchSize(clientAllocator))
506+
releasedCount += numReleased
507+
}
508+
currentSequence = nextSequence
509+
time.Sleep(interval)
510+
}
511+
512+
return releasedCount
513+
}
514+
515+
func runAllocator(ctx context.Context, a *sequenceAllocator, frequency time.Duration) (allocationCount uint64) {
516+
517+
allocationCount = 0
518+
ticker := time.NewTicker(frequency)
519+
for {
520+
select {
521+
case <-ticker.C:
522+
_, _ = a.nextSequence(ctx)
523+
allocationCount++
524+
case <-ctx.Done():
525+
ticker.Stop()
526+
log.Printf("allocator count: %v", allocationCount)
527+
return allocationCount
528+
}
529+
}
530+
}
531+
532+
func getClientSequenceBatchSize(allocator *sequenceAllocator) uint64 {
533+
allocator.mutex.Lock()
534+
defer allocator.mutex.Unlock()
535+
return allocator.sequenceBatchSize
536+
}

0 commit comments

Comments
 (0)