Skip to content

Commit 775af4c

Browse files
committed
Protect against CPU leaser leaks (#8190)
Protect against leaser leaks by warning when they happen and automatically dropping leases if there are more than 1000 active at a time. Also restructure the internal lease implementation to track each lease separately from the load on CPUs. This allows for warning about the location of leaks more easily.
1 parent 6837414 commit 775af4c

File tree

5 files changed

+145
-22
lines changed

5 files changed

+145
-22
lines changed

enterprise/server/remote_execution/containers/firecracker/firecracker_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,10 @@ func getTestEnv(ctx context.Context, t *testing.T, opts envOpts) *testenv.TestEn
261261
require.NoError(t, err)
262262
env.SetCPULeaser(leaser)
263263
flags.Set(t, "executor.cpu_leaser.enable", true)
264+
t.Cleanup(func() {
265+
orphanedLeases := leaser.TestOnlyGetOpenLeases()
266+
require.Equal(t, 0, len(orphanedLeases))
267+
})
264268

265269
return env
266270
}

enterprise/server/remote_execution/containers/ociruntime/ociruntime_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,11 @@ func installLeaserInEnv(t testing.TB, env *real_environment.RealEnv) {
121121
require.NoError(t, err)
122122
env.SetCPULeaser(leaser)
123123
flags.Set(t, "executor.cpu_leaser.enable", true)
124+
125+
t.Cleanup(func() {
126+
orphanedLeases := leaser.TestOnlyGetOpenLeases()
127+
require.Equal(t, 0, len(orphanedLeases))
128+
})
124129
}
125130

126131
func TestRun(t *testing.T) {
@@ -628,6 +633,11 @@ func TestCreateFailureHasStderr(t *testing.T) {
628633
ContainerImage: image,
629634
},
630635
})
636+
t.Cleanup(func() {
637+
err := c.Remove(ctx)
638+
require.NoError(t, err)
639+
})
640+
631641
require.NoError(t, err)
632642
err = c.Create(ctx, wd+"nonexistent")
633643
require.ErrorContains(t, err, "nonexistent")
@@ -658,6 +668,11 @@ func TestDevices(t *testing.T) {
658668
ContainerImage: image,
659669
},
660670
})
671+
672+
t.Cleanup(func() {
673+
err := c.Remove(ctx)
674+
require.NoError(t, err)
675+
})
661676
require.NoError(t, err)
662677
res := c.Run(ctx, &repb.Command{
663678
Arguments: []string{"sh", "-e", "-c", `

enterprise/server/util/cpuset/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ go_library(
1212
visibility = ["//visibility:public"],
1313
deps = [
1414
"//server/interfaces",
15+
"//server/util/alert",
1516
"//server/util/flag",
1617
"//server/util/log",
1718
"//server/util/priority_queue",

enterprise/server/util/cpuset/cpuset.go

Lines changed: 93 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ package cpuset
33
import (
44
"fmt"
55
"math"
6+
"runtime"
67
"strconv"
78
"strings"
89
"sync"
910

1011
"github.com/buildbuddy-io/buildbuddy/server/interfaces"
12+
"github.com/buildbuddy-io/buildbuddy/server/util/alert"
1113
"github.com/buildbuddy-io/buildbuddy/server/util/flag"
1214
"github.com/buildbuddy-io/buildbuddy/server/util/log"
1315
"github.com/buildbuddy-io/buildbuddy/server/util/priority_queue"
@@ -20,14 +22,23 @@ var (
2022
cpuLeaserOverhead = flag.Float64("executor.cpu_leaser.overhead", .20, "The amount of extra CPU *above the task size* to include in a lease")
2123
cpuLeaserMinOverhead = flag.Int("executor.cpu_leaser.min_overhead", 2, "Always ensure at least this many extra cpus are included in a lease")
2224
cpuLeaserCPUSet = flag.String("executor.cpu_leaser.cpuset", "", "Manual override for the set of CPUs that may be leased. Ignored if empty. Ex. '0-1,3'")
25+
warnAboutLeaks = flag.Bool("executor.cpu_leaser.warn_about_leaks", true, "If set, warn about leaked leases")
26+
)
27+
28+
const (
29+
// MaxNumLeases configures a safety valve to prevent a memory leak if
30+
// leasers forget to close their leases.
31+
MaxNumLeases = 1000
2332
)
2433

2534
// Compile-time check that cpuLeaser implements the interface.
26-
var _ interfaces.CPULeaser = (*cpuLeaser)(nil)
35+
var _ interfaces.CPULeaser = (*CPULeaser)(nil)
2736

28-
type cpuLeaser struct {
37+
type CPULeaser struct {
2938
mu sync.Mutex
30-
leases map[cpuInfo][]string
39+
cpus []cpuInfo
40+
leases []lease
41+
load map[int]int
3142
physicalProcessors int
3243
}
3344

@@ -36,6 +47,12 @@ type cpuInfo struct {
3647
physicalID int // numa node
3748
}
3849

50+
type lease struct {
51+
taskID string
52+
cpus []int
53+
location string // only set if *warnAboutLeaks is enabled
54+
}
55+
3956
func toCPUInfos(processors []int, physicalID int) []cpuInfo {
4057
infos := make([]cpuInfo, len(processors))
4158
for i, p := range processors {
@@ -114,9 +131,10 @@ func Parse(s string) ([]int, error) {
114131
return processors, nil
115132
}
116133

117-
func NewLeaser() (interfaces.CPULeaser, error) {
118-
cl := &cpuLeaser{
119-
leases: make(map[cpuInfo][]string),
134+
func NewLeaser() (*CPULeaser, error) {
135+
cl := &CPULeaser{
136+
leases: make([]lease, 0, MaxNumLeases),
137+
load: make(map[int]int, 0),
120138
}
121139

122140
var cpus []cpuInfo
@@ -130,13 +148,16 @@ func NewLeaser() (interfaces.CPULeaser, error) {
130148
cpus = GetCPUs()
131149
}
132150

151+
cl.cpus = make([]cpuInfo, len(cpus))
152+
133153
processors := make(map[int]struct{}, 0)
134-
for _, cpu := range cpus {
135-
cl.leases[cpu] = make([]string, 0)
154+
for i, cpu := range cpus {
155+
cl.cpus[i] = cpu
156+
cl.load[cpu.processor] = 0
136157
processors[cpu.physicalID] = struct{}{}
137158
}
138159
cl.physicalProcessors = len(processors)
139-
log.Debugf("NewLeaser with %d processors and %d cores", cl.physicalProcessors, len(cl.leases))
160+
log.Debugf("NewLeaser with %d processors and %d cores", cl.physicalProcessors, len(cl.cpus))
140161
return cl, nil
141162
}
142163

@@ -167,7 +188,7 @@ func WithNoOverhead() Option {
167188

168189
// Acquire leases a set of CPUs (identified by index) for a task. The returned
169190
// function should be called to free the CPUs when they are no longer used.
170-
func (l *cpuLeaser) Acquire(milliCPU int64, taskID string, opts ...any) (int, []int, func()) {
191+
func (l *CPULeaser) Acquire(milliCPU int64, taskID string, opts ...any) (int, []int, func()) {
171192
l.mu.Lock()
172193
defer l.mu.Unlock()
173194

@@ -181,15 +202,16 @@ func (l *cpuLeaser) Acquire(milliCPU int64, taskID string, opts ...any) (int, []
181202
numCPUs := computeNumCPUs(milliCPU, !options.disableOverhead)
182203
// If the CPU leaser is disabled; return all CPUs.
183204
if !*cpuLeaserEnable {
184-
numCPUs = len(l.leases)
205+
numCPUs = len(l.cpus)
185206
}
186207

187208
// Put all CPUs in a priority queue.
188209
pq := priority_queue.New[cpuInfo]()
189-
for cpuid, tasks := range l.leases {
210+
for _, cpuInfo := range l.cpus {
211+
numTasks := l.load[cpuInfo.processor]
190212
// we want the least loaded cpus first, so give the
191213
// cpus with more tasks a more negative score.
192-
pq.Push(cpuid, -1*len(tasks))
214+
pq.Push(cpuInfo, -1*numTasks)
193215
}
194216

195217
// Get the set of CPUs, in order of load (incr).
@@ -217,30 +239,79 @@ func (l *cpuLeaser) Acquire(milliCPU int64, taskID string, opts ...any) (int, []
217239
if c.physicalID != selectedNode {
218240
continue
219241
}
220-
// If the CPULeaser is enabled, actually track the lease.
221-
if *cpuLeaserEnable {
222-
l.leases[c] = append(l.leases[c], taskID)
223-
}
224242
leaseSet = append(leaseSet, c.processor)
243+
l.load[c.processor] += 1
225244
if len(leaseSet) == numCPUs {
226245
break
227246
}
228247
}
229248

249+
// If the CPULeaser is enabled, actually track the lease.
250+
if *cpuLeaserEnable {
251+
if len(l.leases) >= MaxNumLeases {
252+
droppedLease := l.leases[0]
253+
l.leases = l.leases[1:]
254+
for _, processor := range droppedLease.cpus {
255+
l.load[processor] -= 1
256+
}
257+
if *warnAboutLeaks {
258+
alert.UnexpectedEvent("cpu_leaser_leak", "Acquire() handle leak at %s!", droppedLease.location)
259+
}
260+
}
261+
262+
lease := lease{
263+
taskID: taskID,
264+
cpus: leaseSet,
265+
}
266+
if *warnAboutLeaks {
267+
if _, file, no, ok := runtime.Caller(1); ok {
268+
lease.location = fmt.Sprintf("%s:%d", file, no)
269+
}
270+
}
271+
272+
l.leases = append(l.leases, lease)
273+
}
274+
230275
log.Debugf("Leased %s to task: %q (%d milliCPU)", Format(leaseSet), taskID, milliCPU)
231276
return selectedNode, leaseSet, func() {
232277
l.release(taskID)
233278
}
234279
}
235280

236-
func (l *cpuLeaser) release(taskID string) {
281+
func (l *CPULeaser) release(taskID string) {
237282
l.mu.Lock()
238283
defer l.mu.Unlock()
239284

240-
for cpuid, tasks := range l.leases {
241-
l.leases[cpuid] = slices.DeleteFunc(tasks, func(s string) bool {
242-
return s == taskID
243-
})
285+
var cpus []int
286+
l.leases = slices.DeleteFunc(l.leases, func(l lease) bool {
287+
if l.taskID == taskID {
288+
cpus = l.cpus
289+
return true
290+
}
291+
return false
292+
})
293+
294+
for _, cpu := range cpus {
295+
l.load[cpu] -= 1
244296
}
297+
245298
log.Debugf("Task: %q released CPUs", taskID)
246299
}
300+
301+
func (l *CPULeaser) TestOnlyGetOpenLeases() map[string]int {
302+
l.mu.Lock()
303+
defer l.mu.Unlock()
304+
305+
taskCounts := make(map[string]int)
306+
for _, l := range l.leases {
307+
taskCounts[l.taskID] = len(l.cpus)
308+
}
309+
return taskCounts
310+
}
311+
312+
func (l *CPULeaser) TestOnlyGetLoads() map[int]int {
313+
l.mu.Lock()
314+
defer l.mu.Unlock()
315+
316+
return l.load
317+
}

enterprise/server/util/cpuset/cpuset_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,3 +193,35 @@ func TestNonContiguousNumaNodes(t *testing.T) {
193193
assert.NotElementsMatch(t, cpus1, cpus2)
194194
assert.NotEqual(t, numa1, numa2)
195195
}
196+
197+
func TestMaxNumberOfLeases(t *testing.T) {
198+
flags.Set(t, "executor.cpu_leaser.enable", true)
199+
flags.Set(t, "executor.cpu_leaser.overhead", 0)
200+
flags.Set(t, "executor.cpu_leaser.min_overhead", 0)
201+
flags.Set(t, "executor.cpu_leaser.cpuset", "0")
202+
203+
cs, err := cpuset.NewLeaser()
204+
require.NoError(t, err)
205+
206+
numLeases := cpuset.MaxNumLeases * 3
207+
taskIDs := make([]string, numLeases)
208+
cancels := make([]func(), numLeases)
209+
for i := 0; i < numLeases; i++ {
210+
task := uuid.New()
211+
_, _, cancel := cs.Acquire(1000, task)
212+
taskIDs[i] = task
213+
cancels[i] = cancel
214+
}
215+
216+
require.Equal(t, cpuset.MaxNumLeases, len(cs.TestOnlyGetOpenLeases()))
217+
for _, cancel := range cancels {
218+
cancel()
219+
}
220+
221+
// There should be no open leases, and no active load after
222+
// cancelling all open leases.
223+
require.Equal(t, 0, len(cs.TestOnlyGetOpenLeases()))
224+
for _, load := range cs.TestOnlyGetLoads() {
225+
require.Equal(t, 0, load)
226+
}
227+
}

0 commit comments

Comments
 (0)