Skip to content

Commit 347f2f7

Browse files
authored
refactor(ingestor): WAL Replay controller: Use singleflight to ensure only one inflight flush is happening (#19831)
1 parent 7912a67 commit 347f2f7

File tree

2 files changed

+87
-41
lines changed

2 files changed

+87
-41
lines changed

pkg/ingester/replay_controller.go

Lines changed: 29 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
package ingester
22

33
import (
4-
"sync"
5-
64
"github.com/dustin/go-humanize"
75
"github.com/go-kit/log/level"
86
"go.uber.org/atomic"
7+
"golang.org/x/sync/singleflight"
98

109
util_log "github.com/grafana/loki/v3/pkg/util/log"
1110
)
@@ -49,17 +48,16 @@ type replayController struct {
4948
currentBytes atomic.Int64
5049
cfg WALConfig
5150
metrics *ingesterMetrics
52-
cond *sync.Cond
53-
isFlushing atomic.Bool
54-
flusher Flusher
51+
52+
flusher Flusher
53+
flushSF singleflight.Group
5554
}
5655

5756
// flusher is expected to reduce pressure via calling Sub
5857
func newReplayController(metrics *ingesterMetrics, cfg WALConfig, flusher Flusher) *replayController {
5958
return &replayController{
6059
cfg: cfg,
6160
metrics: metrics,
62-
cond: sync.NewCond(&sync.Mutex{}),
6361
flusher: flusher,
6462
}
6563
}
@@ -79,51 +77,41 @@ func (c *replayController) Cur() int {
7977
}
8078

8179
func (c *replayController) Flush() {
82-
if c.isFlushing.CompareAndSwap(false, true) {
83-
c.metrics.recoveryIsFlushing.Set(1)
84-
prior := c.currentBytes.Load()
85-
level.Debug(util_log.Logger).Log(
86-
"msg", "replay flusher pre-flush",
87-
"bytes", humanize.Bytes(uint64(prior)),
88-
)
89-
90-
c.flusher.Flush()
91-
92-
after := c.currentBytes.Load()
93-
level.Debug(util_log.Logger).Log(
94-
"msg", "replay flusher post-flush",
95-
"bytes", humanize.Bytes(uint64(after)),
96-
)
97-
98-
c.isFlushing.Store(false)
99-
c.metrics.recoveryIsFlushing.Set(0)
100-
101-
// Broadcast after lock is acquired to prevent race conditions with cpu scheduling
102-
// where the flush code could finish before the goroutine which initiated it gets to call
103-
// c.cond.Wait()
104-
c.cond.L.Lock()
105-
c.cond.Broadcast()
106-
c.cond.L.Unlock()
107-
}
80+
// Use singleflight to ensure only one flush happens at a time
81+
_, _, _ = c.flushSF.Do("flush", func() (interface{}, error) {
82+
c.flush()
83+
return nil, nil
84+
})
85+
}
86+
87+
func (c *replayController) flush() {
88+
c.metrics.recoveryIsFlushing.Set(1)
89+
prior := c.currentBytes.Load()
90+
level.Debug(util_log.Logger).Log(
91+
"msg", "replay flusher pre-flush",
92+
"bytes", humanize.Bytes(uint64(prior)),
93+
)
94+
95+
c.flusher.Flush()
96+
97+
after := c.currentBytes.Load()
98+
level.Debug(util_log.Logger).Log(
99+
"msg", "replay flusher post-flush",
100+
"bytes", humanize.Bytes(uint64(after)),
101+
)
102+
103+
c.metrics.recoveryIsFlushing.Set(0)
108104
}
109105

110106
// WithBackPressure is expected to call replayController.Add in the passed function to increase the managed byte count.
111107
// It will call the function as long as there is expected room before the memory cap and will then flush data intermittently
112108
// when needed.
113109
func (c *replayController) WithBackPressure(fn func() error) error {
114-
// Account for backpressure and wait until there's enough memory to continue replaying the WAL
115-
c.cond.L.Lock()
116-
117110
// use 90% as a threshold since we'll be adding to it.
118111
for c.Cur() > int(c.cfg.ReplayMemoryCeiling)*9/10 {
119112
// too much backpressure, flush
120-
go c.Flush()
121-
c.cond.Wait()
113+
c.Flush()
122114
}
123115

124-
// Don't hold the lock while executing the provided function.
125-
// This ensures we can run functions concurrently.
126-
c.cond.L.Unlock()
127-
128116
return fn()
129117
}

pkg/ingester/replay_controller_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
"github.com/stretchr/testify/require"
9+
"go.uber.org/atomic"
910

1011
"github.com/grafana/loki/v3/pkg/util/constants"
1112
)
@@ -75,5 +76,62 @@ func TestReplayController(t *testing.T) {
7576
"WithBackPressure", // add 50, total 50
7677
}
7778
require.Equal(t, expected, ops)
79+
}
80+
81+
// Test to ensure only one flush happens at a time when multiple goroutines call WithBackPressure
82+
func TestReplayControllerConcurrentFlushes(t *testing.T) {
83+
t.Run("multiple goroutines wait for single flush", func(t *testing.T) {
84+
var rc *replayController
85+
86+
var flushesStarted atomic.Int32
87+
var flushInProgress atomic.Bool
88+
89+
flusher := newDumbFlusher(func() {
90+
// Check if a flush is already in progress, fail if there is one.
91+
if !flushInProgress.CompareAndSwap(false, true) {
92+
t.Error("Multiple flushes running concurrently!")
93+
}
94+
95+
flushesStarted.Add(1)
96+
97+
time.Sleep(100 * time.Millisecond) // Simulate a slow flush
98+
99+
rc.Sub(200)
100+
101+
flushInProgress.Store(false)
102+
})
103+
104+
rc = newReplayController(nilMetrics(), WALConfig{ReplayMemoryCeiling: 100}, flusher)
105+
106+
// Fill to trigger flush condition (90% of 100 = 90 bytes threshold)
107+
rc.Add(95)
108+
109+
// Launch multiple goroutines simultaneously
110+
start := make(chan struct{})
111+
var wg sync.WaitGroup
112+
numGoroutines := 5
113+
114+
for i := 0; i < numGoroutines; i++ {
115+
wg.Add(1)
116+
go func() {
117+
defer wg.Done()
118+
<-start
119+
120+
err := rc.WithBackPressure(func() error {
121+
rc.Add(20) // Each would trigger flush condition
122+
return nil
123+
})
124+
require.NoError(t, err)
125+
}()
126+
}
127+
128+
// Start all goroutines at the same time
129+
close(start)
130+
131+
wg.Wait()
78132

133+
// All goroutines should have shared a single flush
134+
require.Equal(t, int32(1), flushesStarted.Load(),
135+
"Singleflight should coalesce all flush requests into one")
136+
})
79137
}

0 commit comments

Comments
 (0)