diff --git a/packages/orchestrator/benchmark_test.go b/packages/orchestrator/benchmark_test.go index 3b4c18fd7e..9af248b253 100644 --- a/packages/orchestrator/benchmark_test.go +++ b/packages/orchestrator/benchmark_test.go @@ -128,7 +128,7 @@ func BenchmarkBaseImageLaunch(b *testing.B) { sbxlogger.SetSandboxLoggerInternal(logger) // sbxlogger.SetSandboxLoggerExternal(logger) - networkPool, err := network.NewPool(noop.MeterProvider{}, 8, 8, clientID, networkConfig) + networkPool, err := network.NewPool(8, 8, clientID, networkConfig) require.NoError(b, err) go func() { networkPool.Populate(b.Context()) @@ -139,7 +139,7 @@ func BenchmarkBaseImageLaunch(b *testing.B) { assert.NoError(b, err) }() - devicePool, err := nbd.NewDevicePool(noop.MeterProvider{}) + devicePool, err := nbd.NewDevicePool() require.NoError(b, err, "do you have the nbd kernel module installed?") go func() { devicePool.Populate(b.Context()) diff --git a/packages/orchestrator/cmd/build-template/main.go b/packages/orchestrator/cmd/build-template/main.go index 1b56d94986..cd58635926 100644 --- a/packages/orchestrator/cmd/build-template/main.go +++ b/packages/orchestrator/cmd/build-template/main.go @@ -116,7 +116,7 @@ func buildTemplate( return fmt.Errorf("could not create storage provider: %w", err) } - devicePool, err := nbd.NewDevicePool(noop.MeterProvider{}) + devicePool, err := nbd.NewDevicePool() if err != nil { return fmt.Errorf("could not create device pool: %w", err) } @@ -130,7 +130,7 @@ func buildTemplate( } }() - networkPool, err := network.NewPool(noop.MeterProvider{}, 8, 8, clientID, networkConfig) + networkPool, err := network.NewPool(8, 8, clientID, networkConfig) if err != nil { return fmt.Errorf("could not create network pool: %w", err) } diff --git a/packages/orchestrator/cmd/mock-nbd/mock.go b/packages/orchestrator/cmd/mock-nbd/mock.go index 6ae41c38a1..4de377116f 100644 --- a/packages/orchestrator/cmd/mock-nbd/mock.go +++ b/packages/orchestrator/cmd/mock-nbd/mock.go @@ -10,7 +10,6 @@ import ( "github.com/google/uuid" "github.com/pojntfx/go-nbd/pkg/backend" - "go.opentelemetry.io/otel/metric/noop" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/block" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/nbd" @@ -87,7 +86,7 @@ func main() { done := make(chan os.Signal, 1) signal.Notify(done, os.Interrupt) - devicePool, err := nbd.NewDevicePool(noop.MeterProvider{}) + devicePool, err := nbd.NewDevicePool() if err != nil { fmt.Fprintf(os.Stderr, "failed to create device pool: %v\n", err) return diff --git a/packages/orchestrator/internal/sandbox/nbd/pool.go b/packages/orchestrator/internal/sandbox/nbd/pool.go index df043e52c1..fe6e2e3f2f 100644 --- a/packages/orchestrator/internal/sandbox/nbd/pool.go +++ b/packages/orchestrator/internal/sandbox/nbd/pool.go @@ -12,10 +12,11 @@ import ( "time" "github.com/bits-and-blooms/bitset" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric" "go.uber.org/zap" - "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" + "github.com/e2b-dev/infra/packages/shared/pkg/utils" ) // maxSlotsReady is the number of slots that are ready to be used. @@ -25,6 +26,22 @@ const ( devicePoolCloseReleaseTimeout = 10 * time.Minute ) +var ( + meter = otel.Meter("github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/nbd") + slotCounter = utils.Must(meter.Int64UpDownCounter("orchestrator.nbd.slots_pool.ready", + metric.WithDescription("Number of nbd slots ready to be used."), + metric.WithUnit("{slot}"), + )) + acquired = utils.Must(meter.Int64Counter("orchestrator.nbd.slots_pool.acquired", + metric.WithDescription("Number of nbd slots acquired."), + metric.WithUnit("{slot}"), + )) + released = utils.Must(meter.Int64Counter("orchestrator.nbd.slots_pool.released", + metric.WithDescription("Number of nbd slots released."), + metric.WithUnit("{slot}"), + )) +) + // NoFreeSlotsError is returned when there are no free slots. // You can retry the request after some time. type NoFreeSlotsError struct{} @@ -62,11 +79,9 @@ type DevicePool struct { mu sync.Mutex slots chan DeviceSlot - - slotCounter metric.Int64UpDownCounter } -func NewDevicePool(meterProvider metric.MeterProvider) (*DevicePool, error) { +func NewDevicePool() (*DevicePool, error) { maxDevices, err := getMaxDevices() if err != nil { return nil, fmt.Errorf("failed to get max devices: %w", err) @@ -76,17 +91,10 @@ func NewDevicePool(meterProvider metric.MeterProvider) (*DevicePool, error) { return nil, errors.New("max devices is 0") } - meter := meterProvider.Meter("orchestrator.device.pool") - counter, err := telemetry.GetUpDownCounter(meter, telemetry.NBDkSlotSReadyPoolCounterMeterName) - if err != nil { - return nil, fmt.Errorf("failed to get slot pool counter: %w", err) - } - pool := &DevicePool{ - done: make(chan struct{}), - usedSlots: bitset.New(maxDevices), - slots: make(chan DeviceSlot, int(math.Min(maxSlotsReady, float64(maxDevices)))), - slotCounter: counter, + done: make(chan struct{}), + usedSlots: bitset.New(maxDevices), + slots: make(chan DeviceSlot, int(math.Min(maxSlotsReady, float64(maxDevices)))), } return pool, nil @@ -133,7 +141,7 @@ func (d *DevicePool) Populate(ctx context.Context) { } failedCount = 0 - d.slotCounter.Add(ctx, 1) + slotCounter.Add(ctx, 1) // Use select to avoid panic if context is canceled before writing select { @@ -249,12 +257,13 @@ func (d *DevicePool) GetDevice(ctx context.Context) (DeviceSlot, error) { case <-ctx.Done(): return 0, ctx.Err() case slot := <-d.slots: - d.slotCounter.Add(ctx, -1) + acquired.Add(ctx, 1) + slotCounter.Add(ctx, -1) return slot, nil } } -func (d *DevicePool) release(idx DeviceSlot) error { +func (d *DevicePool) release(ctx context.Context, idx DeviceSlot) error { free, err := d.isDeviceFree(idx) if err != nil { return fmt.Errorf("failed to check if device is free: %w", err) @@ -268,6 +277,7 @@ func (d *DevicePool) release(idx DeviceSlot) error { d.usedSlots.Clear(uint(idx)) d.mu.Unlock() + released.Add(ctx, 1) return nil } @@ -294,7 +304,7 @@ func (d *DevicePool) ReleaseDevice(ctx context.Context, idx DeviceSlot, opts ... attempt++ - err := d.release(idx) + err := d.release(ctx, idx) if err == nil { return nil } diff --git a/packages/orchestrator/internal/sandbox/network/pool.go b/packages/orchestrator/internal/sandbox/network/pool.go index 635260aec6..566f6e46e8 100644 --- a/packages/orchestrator/internal/sandbox/network/pool.go +++ b/packages/orchestrator/internal/sandbox/network/pool.go @@ -7,10 +7,13 @@ import ( "sync" "github.com/caarlos0/env/v11" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.uber.org/zap" "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" + "github.com/e2b-dev/infra/packages/shared/pkg/utils" ) const ( @@ -18,6 +21,31 @@ const ( ReusedSlotsPoolSize = 100 ) +var ( + meter = otel.Meter("github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/network") + + newSlotsAvailableCounter = utils.Must(meter.Int64UpDownCounter("orchestrator.network.slots_pool.new", + metric.WithDescription("Number of new network slots ready to be used."), + metric.WithUnit("{slot"), + )) + reusableSlotsAvailableCounter = utils.Must(meter.Int64UpDownCounter("orchestrator.network.slots_pool.reused", + metric.WithDescription("Number of reused network slots ready to be used."), + metric.WithUnit("{slot}"), + )) + acquiredSlots = utils.Must(meter.Int64Counter("orchestrator.network.slots_pool.acquired", + metric.WithDescription("Number of network slots acquired."), + metric.WithUnit("{slot}"), + )) + returnedSlotCounter = utils.Must(meter.Int64Counter("orchestrator.network.slots_pool.returned", + metric.WithDescription("Number of network slots returned."), + metric.WithUnit("{slot}"), + )) + releasedSlotCounter = utils.Must(meter.Int64Counter("orchestrator.network.slots_pool.released", + metric.WithDescription("Number of network slots released."), + metric.WithUnit("{slot}"), + )) +) + type Config struct { // Using reserver IPv4 in range that is used for experiments and documentation // https://en.wikipedia.org/wiki/Reserved_IP_addresses @@ -36,45 +64,29 @@ type Pool struct { done chan struct{} doneOnce sync.Once - newSlots chan *Slot - reusedSlots chan *Slot - newSlotCounter metric.Int64UpDownCounter - reusedSlotCounter metric.Int64UpDownCounter + newSlots chan *Slot + reusedSlots chan *Slot slotStorage Storage } var ErrClosed = errors.New("cannot read from a closed pool") -func NewPool(meterProvider metric.MeterProvider, newSlotsPoolSize, reusedSlotsPoolSize int, nodeID string, config Config) (*Pool, error) { +func NewPool(newSlotsPoolSize, reusedSlotsPoolSize int, nodeID string, config Config) (*Pool, error) { newSlots := make(chan *Slot, newSlotsPoolSize-1) reusedSlots := make(chan *Slot, reusedSlotsPoolSize) - meter := meterProvider.Meter("orchestrator.network.pool") - - newSlotCounter, err := telemetry.GetUpDownCounter(meter, telemetry.NewNetworkSlotSPoolCounterMeterName) - if err != nil { - return nil, fmt.Errorf("failed to create new slot counter: %w", err) - } - - reusedSlotsCounter, err := telemetry.GetUpDownCounter(meter, telemetry.ReusedNetworkSlotSPoolCounterMeterName) - if err != nil { - return nil, fmt.Errorf("failed to create reused slot counter: %w", err) - } - slotStorage, err := NewStorage(vrtSlotsSize, nodeID, config) if err != nil { return nil, fmt.Errorf("failed to create slot storage: %w", err) } pool := &Pool{ - config: config, - done: make(chan struct{}), - newSlots: newSlots, - reusedSlots: reusedSlots, - newSlotCounter: newSlotCounter, - reusedSlotCounter: reusedSlotsCounter, - slotStorage: slotStorage, + config: config, + done: make(chan struct{}), + newSlots: newSlots, + reusedSlots: reusedSlots, + slotStorage: slotStorage, } return pool, nil @@ -114,7 +126,7 @@ func (p *Pool) Populate(ctx context.Context) { continue } - p.newSlotCounter.Add(ctx, 1) + newSlotsAvailableCounter.Add(ctx, 1) p.newSlots <- slot } } @@ -127,7 +139,8 @@ func (p *Pool) Get(ctx context.Context, allowInternet bool) (*Slot, error) { case <-p.done: return nil, ErrClosed case s := <-p.reusedSlots: - p.reusedSlotCounter.Add(ctx, -1) + reusableSlotsAvailableCounter.Add(ctx, -1) + acquiredSlots.Add(ctx, 1, metric.WithAttributes(attribute.String("pool", "reused"))) telemetry.ReportEvent(ctx, "reused network slot") slot = s @@ -138,7 +151,8 @@ func (p *Pool) Get(ctx context.Context, allowInternet bool) (*Slot, error) { case <-ctx.Done(): return nil, ctx.Err() case s := <-p.newSlots: - p.newSlotCounter.Add(ctx, -1) + newSlotsAvailableCounter.Add(ctx, -1) + acquiredSlots.Add(ctx, 1, metric.WithAttributes(attribute.String("pool", "new"))) telemetry.ReportEvent(ctx, "new network slot") slot = s @@ -165,7 +179,7 @@ func (p *Pool) Return(ctx context.Context, slot *Slot) error { err := slot.ResetInternet(ctx) if err != nil { // Cleanup the slot if resetting internet fails - if cerr := p.cleanup(slot); cerr != nil { + if cerr := p.cleanup(ctx, slot); cerr != nil { return fmt.Errorf("reset internet: %w; cleanup: %w", err, cerr) } @@ -178,9 +192,10 @@ func (p *Pool) Return(ctx context.Context, slot *Slot) error { case <-p.done: return ErrClosed case p.reusedSlots <- slot: - p.reusedSlotCounter.Add(ctx, 1) + returnedSlotCounter.Add(ctx, 1) + reusableSlotsAvailableCounter.Add(ctx, 1) default: - err := p.cleanup(slot) + err := p.cleanup(ctx, slot) if err != nil { return fmt.Errorf("failed to return slot '%d': %w", slot.Idx, err) } @@ -189,7 +204,7 @@ func (p *Pool) Return(ctx context.Context, slot *Slot) error { return nil } -func (p *Pool) cleanup(slot *Slot) error { +func (p *Pool) cleanup(ctx context.Context, slot *Slot) error { var errs []error err := slot.RemoveNetwork() @@ -202,10 +217,12 @@ func (p *Pool) cleanup(slot *Slot) error { errs = append(errs, fmt.Errorf("failed to release slot '%d': %w", slot.Idx, err)) } + releasedSlotCounter.Add(ctx, 1) + return errors.Join(errs...) } -func (p *Pool) Close(_ context.Context) error { +func (p *Pool) Close(ctx context.Context) error { zap.L().Info("Closing network pool") p.doneOnce.Do(func() { @@ -215,7 +232,7 @@ func (p *Pool) Close(_ context.Context) error { var errs []error for slot := range p.newSlots { - err := p.cleanup(slot) + err := p.cleanup(ctx, slot) if err != nil { errs = append(errs, fmt.Errorf("failed to cleanup slot '%d': %w", slot.Idx, err)) } @@ -224,7 +241,7 @@ func (p *Pool) Close(_ context.Context) error { close(p.reusedSlots) for slot := range p.reusedSlots { - err := p.cleanup(slot) + err := p.cleanup(ctx, slot) if err != nil { errs = append(errs, fmt.Errorf("failed to cleanup slot '%d': %w", slot.Idx, err)) } diff --git a/packages/orchestrator/main.go b/packages/orchestrator/main.go index 75cd38117b..8f2e098428 100644 --- a/packages/orchestrator/main.go +++ b/packages/orchestrator/main.go @@ -326,7 +326,7 @@ func run(config cfg.Config) (success bool) { closers = append(closers, closer{"sandbox proxy", sandboxProxy.Close}) // device pool - devicePool, err := nbd.NewDevicePool(tel.MeterProvider) + devicePool, err := nbd.NewDevicePool() if err != nil { zap.L().Fatal("failed to create device pool", zap.Error(err)) } @@ -337,7 +337,7 @@ func run(config cfg.Config) (success bool) { closers = append(closers, closer{"device pool", devicePool.Close}) // network pool - networkPool, err := network.NewPool(tel.MeterProvider, network.NewSlotsPoolSize, network.ReusedSlotsPoolSize, nodeID, config.NetworkConfig) + networkPool, err := network.NewPool(network.NewSlotsPoolSize, network.ReusedSlotsPoolSize, nodeID, config.NetworkConfig) if err != nil { zap.L().Fatal("failed to create network pool", zap.Error(err)) } diff --git a/packages/shared/pkg/telemetry/meters.go b/packages/shared/pkg/telemetry/meters.go index 7a93a92d31..2285230806 100644 --- a/packages/shared/pkg/telemetry/meters.go +++ b/packages/shared/pkg/telemetry/meters.go @@ -34,10 +34,7 @@ const ( ) const ( - SandboxCountMeterName UpDownCounterType = "api.env.instance.running" - NewNetworkSlotSPoolCounterMeterName UpDownCounterType = "orchestrator.network.slots_pool.new" - ReusedNetworkSlotSPoolCounterMeterName UpDownCounterType = "orchestrator.network.slots_pool.reused" - NBDkSlotSReadyPoolCounterMeterName UpDownCounterType = "orchestrator.nbd.slots_pool.read" + SandboxCountMeterName UpDownCounterType = "api.env.instance.running" ) const ( @@ -120,17 +117,11 @@ var observableCounterUnits = map[ObservableCounterType]string{ } var upDownCounterDesc = map[UpDownCounterType]string{ - SandboxCountMeterName: "Counter of started instances.", - ReusedNetworkSlotSPoolCounterMeterName: "Number of reused network slots ready to be used.", - NewNetworkSlotSPoolCounterMeterName: "Number of new network slots ready to be used.", - NBDkSlotSReadyPoolCounterMeterName: "Number of nbd slots ready to be used.", + SandboxCountMeterName: "Counter of started instances.", } var upDownCounterUnits = map[UpDownCounterType]string{ - SandboxCountMeterName: "{sandbox}", - ReusedNetworkSlotSPoolCounterMeterName: "{slot}", - NewNetworkSlotSPoolCounterMeterName: "{slot}", - NBDkSlotSReadyPoolCounterMeterName: "{slot}", + SandboxCountMeterName: "{sandbox}", } var observableUpDownCounterDesc = map[ObservableUpDownCounterType]string{