Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/orchestrator/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func BenchmarkBaseImageLaunch(b *testing.B) {
sbxlogger.SetSandboxLoggerInternal(logger)
// sbxlogger.SetSandboxLoggerExternal(logger)

networkPool, err := network.NewPool(noop.MeterProvider{}, 8, 8, clientID, networkConfig)
networkPool, err := network.NewPool(8, 8, clientID, networkConfig)
require.NoError(b, err)
go func() {
networkPool.Populate(b.Context())
Expand All @@ -139,7 +139,7 @@ func BenchmarkBaseImageLaunch(b *testing.B) {
assert.NoError(b, err)
}()

devicePool, err := nbd.NewDevicePool(noop.MeterProvider{})
devicePool, err := nbd.NewDevicePool()
require.NoError(b, err, "do you have the nbd kernel module installed?")
go func() {
devicePool.Populate(b.Context())
Expand Down
4 changes: 2 additions & 2 deletions packages/orchestrator/cmd/build-template/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func buildTemplate(
return fmt.Errorf("could not create storage provider: %w", err)
}

devicePool, err := nbd.NewDevicePool(noop.MeterProvider{})
devicePool, err := nbd.NewDevicePool()
if err != nil {
return fmt.Errorf("could not create device pool: %w", err)
}
Expand All @@ -130,7 +130,7 @@ func buildTemplate(
}
}()

networkPool, err := network.NewPool(noop.MeterProvider{}, 8, 8, clientID, networkConfig)
networkPool, err := network.NewPool(8, 8, clientID, networkConfig)
if err != nil {
return fmt.Errorf("could not create network pool: %w", err)
}
Expand Down
3 changes: 1 addition & 2 deletions packages/orchestrator/cmd/mock-nbd/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/google/uuid"
"github.com/pojntfx/go-nbd/pkg/backend"
"go.opentelemetry.io/otel/metric/noop"

"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/block"
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/nbd"
Expand Down Expand Up @@ -87,7 +86,7 @@ func main() {

done := make(chan os.Signal, 1)
signal.Notify(done, os.Interrupt)
devicePool, err := nbd.NewDevicePool(noop.MeterProvider{})
devicePool, err := nbd.NewDevicePool()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to create device pool: %v\n", err)
return
Expand Down
46 changes: 28 additions & 18 deletions packages/orchestrator/internal/sandbox/nbd/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import (
"time"

"github.com/bits-and-blooms/bitset"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"

"github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
"github.com/e2b-dev/infra/packages/shared/pkg/utils"
)

// maxSlotsReady is the number of slots that are ready to be used.
Expand All @@ -25,6 +26,22 @@ const (
devicePoolCloseReleaseTimeout = 10 * time.Minute
)

var (
meter = otel.Meter("github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/nbd")
slotCounter = utils.Must(meter.Int64UpDownCounter("orchestrator.nbd.slots_pool.ready",
metric.WithDescription("Number of nbd slots ready to be used."),
metric.WithUnit("{slot}"),
))
acquired = utils.Must(meter.Int64Counter("orchestrator.nbd.slots_pool.acquired",
metric.WithDescription("Number of nbd slots acquired."),
metric.WithUnit("{slot}"),
))
released = utils.Must(meter.Int64Counter("orchestrator.nbd.slots_pool.released",
metric.WithDescription("Number of nbd slots released."),
metric.WithUnit("{slot}"),
))
)

// NoFreeSlotsError is returned when there are no free slots.
// You can retry the request after some time.
type NoFreeSlotsError struct{}
Expand Down Expand Up @@ -62,11 +79,9 @@ type DevicePool struct {
mu sync.Mutex

slots chan DeviceSlot

slotCounter metric.Int64UpDownCounter
}

func NewDevicePool(meterProvider metric.MeterProvider) (*DevicePool, error) {
func NewDevicePool() (*DevicePool, error) {
maxDevices, err := getMaxDevices()
if err != nil {
return nil, fmt.Errorf("failed to get max devices: %w", err)
Expand All @@ -76,17 +91,10 @@ func NewDevicePool(meterProvider metric.MeterProvider) (*DevicePool, error) {
return nil, errors.New("max devices is 0")
}

meter := meterProvider.Meter("orchestrator.device.pool")
counter, err := telemetry.GetUpDownCounter(meter, telemetry.NBDkSlotSReadyPoolCounterMeterName)
if err != nil {
return nil, fmt.Errorf("failed to get slot pool counter: %w", err)
}

pool := &DevicePool{
done: make(chan struct{}),
usedSlots: bitset.New(maxDevices),
slots: make(chan DeviceSlot, int(math.Min(maxSlotsReady, float64(maxDevices)))),
slotCounter: counter,
done: make(chan struct{}),
usedSlots: bitset.New(maxDevices),
slots: make(chan DeviceSlot, int(math.Min(maxSlotsReady, float64(maxDevices)))),
}

return pool, nil
Expand Down Expand Up @@ -133,7 +141,7 @@ func (d *DevicePool) Populate(ctx context.Context) {
}
failedCount = 0

d.slotCounter.Add(ctx, 1)
slotCounter.Add(ctx, 1)

// Use select to avoid panic if context is canceled before writing
select {
Expand Down Expand Up @@ -249,12 +257,13 @@ func (d *DevicePool) GetDevice(ctx context.Context) (DeviceSlot, error) {
case <-ctx.Done():
return 0, ctx.Err()
case slot := <-d.slots:
d.slotCounter.Add(ctx, -1)
acquired.Add(ctx, 1)
slotCounter.Add(ctx, -1)
return slot, nil
}
}

func (d *DevicePool) release(idx DeviceSlot) error {
func (d *DevicePool) release(ctx context.Context, idx DeviceSlot) error {
free, err := d.isDeviceFree(idx)
if err != nil {
return fmt.Errorf("failed to check if device is free: %w", err)
Expand All @@ -268,6 +277,7 @@ func (d *DevicePool) release(idx DeviceSlot) error {
d.usedSlots.Clear(uint(idx))
d.mu.Unlock()

released.Add(ctx, 1)
return nil
}

Expand All @@ -294,7 +304,7 @@ func (d *DevicePool) ReleaseDevice(ctx context.Context, idx DeviceSlot, opts ...

attempt++

err := d.release(idx)
err := d.release(ctx, idx)
if err == nil {
return nil
}
Expand Down
85 changes: 51 additions & 34 deletions packages/orchestrator/internal/sandbox/network/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,45 @@ import (
"sync"

"github.com/caarlos0/env/v11"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"

"github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
"github.com/e2b-dev/infra/packages/shared/pkg/utils"
)

const (
NewSlotsPoolSize = 32
ReusedSlotsPoolSize = 100
)

var (
meter = otel.Meter("github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/network")

newSlotsAvailableCounter = utils.Must(meter.Int64UpDownCounter("orchestrator.network.slots_pool.new",
metric.WithDescription("Number of new network slots ready to be used."),
metric.WithUnit("{slot"),
))
reusableSlotsAvailableCounter = utils.Must(meter.Int64UpDownCounter("orchestrator.network.slots_pool.reused",
metric.WithDescription("Number of reused network slots ready to be used."),
metric.WithUnit("{slot}"),
))
acquiredSlots = utils.Must(meter.Int64Counter("orchestrator.network.slots_pool.acquired",
metric.WithDescription("Number of network slots acquired."),
metric.WithUnit("{slot}"),
))
returnedSlotCounter = utils.Must(meter.Int64Counter("orchestrator.network.slots_pool.returned",
metric.WithDescription("Number of network slots returned."),
metric.WithUnit("{slot}"),
))
releasedSlotCounter = utils.Must(meter.Int64Counter("orchestrator.network.slots_pool.released",
metric.WithDescription("Number of network slots released."),
metric.WithUnit("{slot}"),
))
)

type Config struct {
// Using reserver IPv4 in range that is used for experiments and documentation
// https://en.wikipedia.org/wiki/Reserved_IP_addresses
Expand All @@ -36,45 +64,29 @@ type Pool struct {
done chan struct{}
doneOnce sync.Once

newSlots chan *Slot
reusedSlots chan *Slot
newSlotCounter metric.Int64UpDownCounter
reusedSlotCounter metric.Int64UpDownCounter
newSlots chan *Slot
reusedSlots chan *Slot

slotStorage Storage
}

var ErrClosed = errors.New("cannot read from a closed pool")

func NewPool(meterProvider metric.MeterProvider, newSlotsPoolSize, reusedSlotsPoolSize int, nodeID string, config Config) (*Pool, error) {
func NewPool(newSlotsPoolSize, reusedSlotsPoolSize int, nodeID string, config Config) (*Pool, error) {
newSlots := make(chan *Slot, newSlotsPoolSize-1)
reusedSlots := make(chan *Slot, reusedSlotsPoolSize)

meter := meterProvider.Meter("orchestrator.network.pool")

newSlotCounter, err := telemetry.GetUpDownCounter(meter, telemetry.NewNetworkSlotSPoolCounterMeterName)
if err != nil {
return nil, fmt.Errorf("failed to create new slot counter: %w", err)
}

reusedSlotsCounter, err := telemetry.GetUpDownCounter(meter, telemetry.ReusedNetworkSlotSPoolCounterMeterName)
if err != nil {
return nil, fmt.Errorf("failed to create reused slot counter: %w", err)
}

slotStorage, err := NewStorage(vrtSlotsSize, nodeID, config)
if err != nil {
return nil, fmt.Errorf("failed to create slot storage: %w", err)
}

pool := &Pool{
config: config,
done: make(chan struct{}),
newSlots: newSlots,
reusedSlots: reusedSlots,
newSlotCounter: newSlotCounter,
reusedSlotCounter: reusedSlotsCounter,
slotStorage: slotStorage,
config: config,
done: make(chan struct{}),
newSlots: newSlots,
reusedSlots: reusedSlots,
slotStorage: slotStorage,
}

return pool, nil
Expand Down Expand Up @@ -114,7 +126,7 @@ func (p *Pool) Populate(ctx context.Context) {
continue
}

p.newSlotCounter.Add(ctx, 1)
newSlotsAvailableCounter.Add(ctx, 1)
p.newSlots <- slot
}
}
Expand All @@ -127,7 +139,8 @@ func (p *Pool) Get(ctx context.Context, allowInternet bool) (*Slot, error) {
case <-p.done:
return nil, ErrClosed
case s := <-p.reusedSlots:
p.reusedSlotCounter.Add(ctx, -1)
reusableSlotsAvailableCounter.Add(ctx, -1)
acquiredSlots.Add(ctx, 1, metric.WithAttributes(attribute.String("pool", "reused")))
telemetry.ReportEvent(ctx, "reused network slot")

slot = s
Expand All @@ -138,7 +151,8 @@ func (p *Pool) Get(ctx context.Context, allowInternet bool) (*Slot, error) {
case <-ctx.Done():
return nil, ctx.Err()
case s := <-p.newSlots:
p.newSlotCounter.Add(ctx, -1)
newSlotsAvailableCounter.Add(ctx, -1)
acquiredSlots.Add(ctx, 1, metric.WithAttributes(attribute.String("pool", "new")))
telemetry.ReportEvent(ctx, "new network slot")

slot = s
Expand All @@ -165,7 +179,7 @@ func (p *Pool) Return(ctx context.Context, slot *Slot) error {
err := slot.ResetInternet(ctx)
if err != nil {
// Cleanup the slot if resetting internet fails
if cerr := p.cleanup(slot); cerr != nil {
if cerr := p.cleanup(ctx, slot); cerr != nil {
return fmt.Errorf("reset internet: %w; cleanup: %w", err, cerr)
}

Expand All @@ -178,9 +192,10 @@ func (p *Pool) Return(ctx context.Context, slot *Slot) error {
case <-p.done:
return ErrClosed
case p.reusedSlots <- slot:
p.reusedSlotCounter.Add(ctx, 1)
returnedSlotCounter.Add(ctx, 1)
reusableSlotsAvailableCounter.Add(ctx, 1)
default:
err := p.cleanup(slot)
err := p.cleanup(ctx, slot)
if err != nil {
return fmt.Errorf("failed to return slot '%d': %w", slot.Idx, err)
}
Expand All @@ -189,7 +204,7 @@ func (p *Pool) Return(ctx context.Context, slot *Slot) error {
return nil
}

func (p *Pool) cleanup(slot *Slot) error {
func (p *Pool) cleanup(ctx context.Context, slot *Slot) error {
var errs []error

err := slot.RemoveNetwork()
Expand All @@ -202,10 +217,12 @@ func (p *Pool) cleanup(slot *Slot) error {
errs = append(errs, fmt.Errorf("failed to release slot '%d': %w", slot.Idx, err))
}

releasedSlotCounter.Add(ctx, 1)

return errors.Join(errs...)
}

func (p *Pool) Close(_ context.Context) error {
func (p *Pool) Close(ctx context.Context) error {
zap.L().Info("Closing network pool")

p.doneOnce.Do(func() {
Expand All @@ -215,7 +232,7 @@ func (p *Pool) Close(_ context.Context) error {
var errs []error

for slot := range p.newSlots {
err := p.cleanup(slot)
err := p.cleanup(ctx, slot)
if err != nil {
errs = append(errs, fmt.Errorf("failed to cleanup slot '%d': %w", slot.Idx, err))
}
Expand All @@ -224,7 +241,7 @@ func (p *Pool) Close(_ context.Context) error {
close(p.reusedSlots)

for slot := range p.reusedSlots {
err := p.cleanup(slot)
err := p.cleanup(ctx, slot)
if err != nil {
errs = append(errs, fmt.Errorf("failed to cleanup slot '%d': %w", slot.Idx, err))
}
Expand Down
4 changes: 2 additions & 2 deletions packages/orchestrator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func run(config cfg.Config) (success bool) {
closers = append(closers, closer{"sandbox proxy", sandboxProxy.Close})

// device pool
devicePool, err := nbd.NewDevicePool(tel.MeterProvider)
devicePool, err := nbd.NewDevicePool()
if err != nil {
zap.L().Fatal("failed to create device pool", zap.Error(err))
}
Expand All @@ -337,7 +337,7 @@ func run(config cfg.Config) (success bool) {
closers = append(closers, closer{"device pool", devicePool.Close})

// network pool
networkPool, err := network.NewPool(tel.MeterProvider, network.NewSlotsPoolSize, network.ReusedSlotsPoolSize, nodeID, config.NetworkConfig)
networkPool, err := network.NewPool(network.NewSlotsPoolSize, network.ReusedSlotsPoolSize, nodeID, config.NetworkConfig)
if err != nil {
zap.L().Fatal("failed to create network pool", zap.Error(err))
}
Expand Down
Loading