diff --git a/.github/actions/start-services/action.yml b/.github/actions/start-services/action.yml index 1c135287eb..9614d8654e 100644 --- a/.github/actions/start-services/action.yml +++ b/.github/actions/start-services/action.yml @@ -44,6 +44,11 @@ runs: make -C tests/integration seed shell: bash + - name: Start Redis + run: | + docker run -d --name redis -p 6379:6379 redis:7.4.2-alpine + shell: bash + - name: Start Services env: ENVD_TIMEOUT: "60s" @@ -51,6 +56,7 @@ runs: SANDBOX_ACCESS_TOKEN_HASH_SEED: "abcdefghijklmnopqrstuvwxyz" SUPABASE_JWT_SECRETS: "supabasejwtsecretsupabasejwtsecret" TEMPLATE_MANAGER_HOST: "localhost:5008" + REDIS_URL: "localhost:6379" ARTIFACTS_REGISTRY_PROVIDER: "Local" STORAGE_PROVIDER: "Local" ENVIRONMENT: "local" diff --git a/packages/api/go.mod b/packages/api/go.mod index a86cd6df5b..f25684141b 100644 --- a/packages/api/go.mod +++ b/packages/api/go.mod @@ -38,7 +38,7 @@ require ( github.com/launchdarkly/go-sdk-common/v3 v3.1.0 github.com/pkg/errors v0.9.1 github.com/pressly/goose/v3 v3.24.2 - github.com/redis/go-redis/v9 v9.8.0 + github.com/redis/go-redis/v9 v9.10.0 github.com/stretchr/testify v1.10.0 google.golang.org/grpc v1.72.1 ) diff --git a/packages/api/go.sum b/packages/api/go.sum index 925ef0090d..c811a88f0b 100644 --- a/packages/api/go.sum +++ b/packages/api/go.sum @@ -859,8 +859,8 @@ github.com/prometheus/procfs v0.16.0/go.mod h1:8veyXUu3nGP7oaCxhX6yeaM5u4stL2FeM github.com/prometheus/prometheus v0.47.2-0.20231010075449-4b9c19fe5510 h1:6ksZ7t1hNOzGPPs8DK7SvXQf6UfWzi+W5Z7PCBl8gx4= github.com/prometheus/prometheus v0.47.2-0.20231010075449-4b9c19fe5510/go.mod h1:UC0TwJiF90m2T3iYPQBKnGu8gv3s55dF/EgpTq8gyvo= github.com/redis/go-redis/v9 v9.0.0-rc.4/go.mod h1:Vo3EsyWnicKnSKCA7HhgnvnyA74wOA69Cd2Meli5mmA= -github.com/redis/go-redis/v9 v9.8.0 h1:q3nRvjrlge/6UD7eTu/DSg2uYiU2mCL0G/uzBWqhicI= -github.com/redis/go-redis/v9 v9.8.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= +github.com/redis/go-redis/v9 v9.10.0 h1:FxwK3eV8p/CQa0Ch276C7u2d0eNC9kCmAYQ7mCXCzVs= +github.com/redis/go-redis/v9 v9.10.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= diff --git a/packages/client-proxy/go.mod b/packages/client-proxy/go.mod index 9b6e2b1849..c66a4b3be1 100644 --- a/packages/client-proxy/go.mod +++ b/packages/client-proxy/go.mod @@ -16,7 +16,7 @@ require ( github.com/jellydator/ttlcache/v3 v3.3.1-0.20250207140243-aefc35918359 github.com/miekg/dns v1.1.63 github.com/oapi-codegen/gin-middleware v1.0.2 - github.com/redis/go-redis/v9 v9.8.0 + github.com/redis/go-redis/v9 v9.10.0 github.com/soheilhy/cmux v0.1.5 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.58.0 go.opentelemetry.io/otel/metric v1.36.0 diff --git a/packages/client-proxy/go.sum b/packages/client-proxy/go.sum index 7cb21a1ae5..f1c7529600 100644 --- a/packages/client-proxy/go.sum +++ b/packages/client-proxy/go.sum @@ -673,8 +673,8 @@ github.com/prometheus/procfs v0.16.0 h1:xh6oHhKwnOJKMYiYBDWmkHqQPyiY40sny36Cmx2b github.com/prometheus/procfs v0.16.0/go.mod h1:8veyXUu3nGP7oaCxhX6yeaM5u4stL2FeMXnCqhDthZg= github.com/prometheus/prometheus v0.47.2-0.20231010075449-4b9c19fe5510 h1:6ksZ7t1hNOzGPPs8DK7SvXQf6UfWzi+W5Z7PCBl8gx4= github.com/prometheus/prometheus v0.47.2-0.20231010075449-4b9c19fe5510/go.mod h1:UC0TwJiF90m2T3iYPQBKnGu8gv3s55dF/EgpTq8gyvo= -github.com/redis/go-redis/v9 v9.8.0 h1:q3nRvjrlge/6UD7eTu/DSg2uYiU2mCL0G/uzBWqhicI= -github.com/redis/go-redis/v9 v9.8.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= +github.com/redis/go-redis/v9 v9.10.0 h1:FxwK3eV8p/CQa0Ch276C7u2d0eNC9kCmAYQ7mCXCzVs= +github.com/redis/go-redis/v9 v9.10.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= github.com/redis/rueidis v1.0.19 h1:s65oWtotzlIFN8eMPhyYwxlwLR1lUdhza2KtWprKYSo= github.com/redis/rueidis v1.0.19/go.mod h1:8B+r5wdnjwK3lTFml5VtxjzGOQAC+5UmujoD12pDrEo= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= diff --git a/packages/nomad/main.tf b/packages/nomad/main.tf index 8358212a49..ea87922d05 100644 --- a/packages/nomad/main.tf +++ b/packages/nomad/main.tf @@ -403,6 +403,7 @@ locals { otel_collector_grpc_endpoint = "localhost:${var.otel_collector_grpc_port}" allow_sandbox_internet = var.allow_sandbox_internet launch_darkly_api_key = trimspace(data.google_secret_manager_secret_version.launch_darkly_api_key.secret_data) + redis_url = data.google_secret_manager_secret_version.redis_url.secret_data != "redis.service.consul" ? "${data.google_secret_manager_secret_version.redis_url.secret_data}:${var.redis_port.port}" : "redis.service.consul:${var.redis_port.port}" } orchestrator_job_check = templatefile("${path.module}/orchestrator.hcl", merge( @@ -485,6 +486,7 @@ resource "nomad_job" "template_manager" { logs_collector_public_ip = var.logs_proxy_address orchestrator_services = "template-manager" allow_sandbox_internet = var.allow_sandbox_internet + redis_url = data.google_secret_manager_secret_version.redis_url.secret_data != "redis.service.consul" ? "${data.google_secret_manager_secret_version.redis_url.secret_data}:${var.redis_port.port}" : "redis.service.consul:${var.redis_port.port}" }) } resource "nomad_job" "loki" { diff --git a/packages/nomad/orchestrator.hcl b/packages/nomad/orchestrator.hcl index d35b799378..a1b18fa88e 100644 --- a/packages/nomad/orchestrator.hcl +++ b/packages/nomad/orchestrator.hcl @@ -70,6 +70,7 @@ EOT TEMPLATE_BUCKET_NAME = "${template_bucket_name}" OTEL_COLLECTOR_GRPC_ENDPOINT = "${otel_collector_grpc_endpoint}" ALLOW_SANDBOX_INTERNET = "${allow_sandbox_internet}" + REDIS_URL = "${redis_url}" %{ if launch_darkly_api_key != "" } LAUNCH_DARKLY_API_KEY = "${launch_darkly_api_key}" diff --git a/packages/nomad/template-manager.hcl b/packages/nomad/template-manager.hcl index 6987ed5916..a3e0f1c589 100644 --- a/packages/nomad/template-manager.hcl +++ b/packages/nomad/template-manager.hcl @@ -64,6 +64,7 @@ job "template-manager" { ORCHESTRATOR_SERVICES = "${orchestrator_services}" LOGS_COLLECTOR_PUBLIC_IP = "${logs_collector_public_ip}" ALLOW_SANDBOX_INTERNET = "${allow_sandbox_internet}" + REDIS_URL = "${redis_url}" %{ if !update_stanza } FORCE_STOP = "true" %{ endif } diff --git a/packages/orchestrator/cmd/build-template/main.go b/packages/orchestrator/cmd/build-template/main.go index 50663e04d5..c4c3b0427e 100644 --- a/packages/orchestrator/cmd/build-template/main.go +++ b/packages/orchestrator/cmd/build-template/main.go @@ -29,7 +29,9 @@ import ( "github.com/e2b-dev/infra/packages/shared/pkg/storage" ) -const proxyPort = 5007 +const ( + proxyPort = 5007 +) func main() { ctx, cancel := context.WithCancel(context.Background()) diff --git a/packages/orchestrator/go.mod b/packages/orchestrator/go.mod index 450578b16b..2aca7ed8da 100644 --- a/packages/orchestrator/go.mod +++ b/packages/orchestrator/go.mod @@ -25,6 +25,7 @@ require ( github.com/ngrok/firewall_toolkit v0.0.18 github.com/pkg/errors v0.9.1 github.com/pojntfx/go-nbd v0.3.2 + github.com/redis/go-redis/v9 v9.10.0 github.com/rs/zerolog v1.34.0 github.com/soheilhy/cmux v0.1.5 github.com/stretchr/testify v1.10.0 @@ -94,6 +95,7 @@ require ( github.com/containernetworking/cni v1.2.3 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dchest/uniuri v1.2.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/distribution/reference v0.6.0 // indirect github.com/docker/cli v28.2.2+incompatible // indirect github.com/docker/distribution v2.8.3+incompatible // indirect diff --git a/packages/orchestrator/go.sum b/packages/orchestrator/go.sum index 3db4df2e5d..de32709c60 100644 --- a/packages/orchestrator/go.sum +++ b/packages/orchestrator/go.sum @@ -174,6 +174,10 @@ github.com/blang/semver v3.1.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnweb github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/bshuster-repo/logrus-logstash-hook v0.4.1/go.mod h1:zsTqEiSzDgAa/8GZR7E1qaXrhYNDKBYy5/dWPTIflbk= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/buger/jsonparser v0.0.0-20180808090653-f4dd9f5a6b44/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/bugsnag/bugsnag-go v0.0.0-20141110184014-b1d153021fcd/go.mod h1:2oa8nejYd4cQ/b0hMIopN0lCRxU0bueqREvZLWFrtK8= @@ -339,6 +343,8 @@ github.com/dchest/uniuri v1.2.0/go.mod h1:fSzm4SLHzNZvWLvWJew423PhAzkpNQYq+uNLq4 github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba/go.mod h1:dV8lFg6daOBZbT6/BDGIz6Y3WFGn8juu6G+CQ6LHtl0= github.com/dgrijalva/jwt-go v0.0.0-20170104182250-a601269ab70c/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= @@ -941,6 +947,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/redis/go-redis/v9 v9.10.0 h1:FxwK3eV8p/CQa0Ch276C7u2d0eNC9kCmAYQ7mCXCzVs= +github.com/redis/go-redis/v9 v9.10.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= diff --git a/packages/orchestrator/internal/consts.go b/packages/orchestrator/internal/consts.go new file mode 100644 index 0000000000..2d0e30657c --- /dev/null +++ b/packages/orchestrator/internal/consts.go @@ -0,0 +1,13 @@ +package internal + +import ( + "github.com/e2b-dev/infra/packages/shared/pkg/env" +) + +const ( + defaultSandboxEventIP = "203.0.113.0" +) + +func GetSandboxEventIP() string { + return env.GetEnv("SANDBOX_EVENT_IP", defaultSandboxEventIP) +} diff --git a/packages/orchestrator/internal/sandbox/event/handlers.go b/packages/orchestrator/internal/sandbox/event/handlers.go new file mode 100644 index 0000000000..b3cadcbcd6 --- /dev/null +++ b/packages/orchestrator/internal/sandbox/event/handlers.go @@ -0,0 +1,123 @@ +package event + +import ( + "context" + "encoding/json" + "io" + "net/http" + "strings" + + "go.uber.org/zap" +) + +type EventHandler interface { + Path() string + HandlerFunc(w http.ResponseWriter, r *http.Request) +} + +type MetricsHandler struct{} + +func (h *MetricsHandler) Path() string { + return "/metrics" +} + +func (h *MetricsHandler) HandlerFunc(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + _, err := w.Write([]byte(`{"event_ack":true,"path":"/metrics"}`)) + if err != nil { + http.Error(w, "Failed to write response", http.StatusInternalServerError) + return + } +} + +// This is used to track ad-hoc events that are not handled by the event server. +type DefaultHandler struct { + store SandboxEventStore +} + +func (h *DefaultHandler) Path() string { + return "/" +} + +func (h *DefaultHandler) HandlerFunc(w http.ResponseWriter, r *http.Request) { + addr := r.RemoteAddr + ip := strings.Split(addr, ":")[0] + sandboxID, err := h.store.GetSandboxIP(ip) + if err != nil { + zap.L().Error("Failed to get sandbox ID from IP", zap.Error(err)) + http.Error(w, "Error handling event", http.StatusInternalServerError) + return + } + + zap.L().Debug("Received request from sandbox", zap.String("sandbox_id", sandboxID), zap.String("ip", ip)) + + if r.Method == http.MethodGet { + events, err := h.store.GetLastNEvents(sandboxID, 10) + if err != nil { + zap.L().Error("Failed to get event data for sandbox "+sandboxID, zap.Error(err)) + http.Error(w, "Failed to get event data for sandbox "+sandboxID, http.StatusInternalServerError) + return + } + + eventJSON, err := json.Marshal(events) + if err != nil { + zap.L().Error("Failed to marshal event data", zap.Error(err)) + http.Error(w, "Failed to marshal event data", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write(eventJSON) + return + } + + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // Create event data with path and body + eventData := SandboxEvent{ + Path: r.URL.Path, + } + + body, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "Failed to read request body", http.StatusInternalServerError) + return + } + + zap.L().Info("Received event", zap.String("body", string(body))) + + eventData.Body = make(map[string]any) + err = json.Unmarshal(body, &eventData.Body) + if err != nil { + zap.L().Error("Failed to unmarshal request body", zap.Error(err)) + http.Error(w, "Failed to unmarshal request body", http.StatusInternalServerError) + return + } + + // Store in Redis with sandboxID as key + err = h.store.AddEvent(sandboxID, &eventData, 0) + if err != nil { + zap.L().Error("Failed to store event data", zap.Error(err)) + http.Error(w, "Failed to store event data", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusCreated) + w.Write([]byte(`{"event_ack":true}`)) +} + +func NewEventHandlers(ctx context.Context, store SandboxEventStore) []EventHandler { + return []EventHandler{ + &MetricsHandler{}, + &DefaultHandler{store}, + } +} diff --git a/packages/orchestrator/internal/sandbox/event/server.go b/packages/orchestrator/internal/sandbox/event/server.go new file mode 100644 index 0000000000..a2efc61bcd --- /dev/null +++ b/packages/orchestrator/internal/sandbox/event/server.go @@ -0,0 +1,48 @@ +package event + +import ( + "context" + "fmt" + "net/http" +) + +// SandboxEventServer handles outbound HTTP requests from sandboxes calling the event.e2b.com endpoint +type SandboxEventServer struct { + server *http.Server +} + +func NewSandboxEventServer(port uint, handlers []EventHandler) *SandboxEventServer { + mux := http.NewServeMux() + + for _, handler := range handlers { + mux.HandleFunc(handler.Path(), handler.HandlerFunc) + } + + server := &http.Server{ + Addr: fmt.Sprintf(":%d", port), + Handler: mux, + } + + return &SandboxEventServer{ + server: server, + } +} + +func (p *SandboxEventServer) Start() error { + return p.server.ListenAndServe() +} + +func (p *SandboxEventServer) Close(ctx context.Context) error { + var err error + select { + case <-ctx.Done(): + err = p.server.Close() + default: + err = p.server.Shutdown(ctx) + } + if err != nil { + return fmt.Errorf("failed to shutdown event server: %w", err) + } + + return nil +} diff --git a/packages/orchestrator/internal/sandbox/event/store.go b/packages/orchestrator/internal/sandbox/event/store.go new file mode 100644 index 0000000000..3bcbe4ad9c --- /dev/null +++ b/packages/orchestrator/internal/sandbox/event/store.go @@ -0,0 +1,133 @@ +package event + +import ( + "context" + "encoding/json" + "time" + + "github.com/redis/go-redis/v9" + "go.opentelemetry.io/otel/trace" +) + +const ( + cacheTL = time.Hour * 24 * 30 + + EventPrefix = "ev:" + IPPrefix = "ip:" +) + +type SandboxEvent struct { + Path string `json:"path"` + Body map[string]any `json:"body"` +} + +func (i SandboxEvent) MarshalBinary() ([]byte, error) { + return json.Marshal(i) +} + +type sandboxEventStore struct { + ctx context.Context + tracer trace.Tracer + redisClient redis.UniversalClient +} + +type SandboxEventStore interface { + SetSandboxIP(sandboxId string, ip string) error + GetSandboxIP(sandboxId string) (string, error) + DelSandboxIP(sandboxId string) error + + GetLastEvent(sandboxId string) (*SandboxEvent, error) + GetLastNEvents(sandboxId string, n int) ([]*SandboxEvent, error) + AddEvent(sandboxId string, SandboxEvent *SandboxEvent, expiration time.Duration) error + DelEvent(sandboxId string) error + + Close() error +} + +func NewSandboxEventStore(ctx context.Context, tracer trace.Tracer, redisClient redis.UniversalClient) SandboxEventStore { + return &sandboxEventStore{ + ctx: ctx, + tracer: tracer, + redisClient: redisClient, + } +} + +func (c *sandboxEventStore) SetSandboxIP(sandboxId string, ip string) error { + return c.redisClient.Set(c.ctx, IPPrefix+ip, sandboxId, cacheTL).Err() +} + +func (c *sandboxEventStore) GetSandboxIP(ip string) (string, error) { + return c.redisClient.Get(c.ctx, IPPrefix+ip).Result() +} + +func (c *sandboxEventStore) DelSandboxIP(ip string) error { + return c.redisClient.Del(c.ctx, IPPrefix+ip).Err() +} + +func (c *sandboxEventStore) GetLastEvent(sandboxId string) (*SandboxEvent, error) { + _, span := c.tracer.Start(c.ctx, "sandbox-event-get-last") + defer span.End() + + result, err := c.redisClient.ZRevRangeWithScores(c.ctx, EventPrefix+sandboxId, 0, 0).Result() + if err != nil { + return nil, err + } + if len(result) == 0 { + return nil, redis.Nil + } + rawEvent := result[0].Member.(string) + + var event SandboxEvent + err = json.Unmarshal([]byte(rawEvent), &event) + if err != nil { + return nil, err + } + return &event, nil +} + +func (c *sandboxEventStore) GetLastNEvents(sandboxId string, n int) ([]*SandboxEvent, error) { + _, span := c.tracer.Start(c.ctx, "sandbox-event-get-last-n") + defer span.End() + + result, err := c.redisClient.ZRevRangeWithScores(c.ctx, EventPrefix+sandboxId, 0, int64(n-1)).Result() + if err != nil { + return nil, err + } + if len(result) == 0 { + return nil, redis.Nil + } + + events := make([]*SandboxEvent, 0, len(result)) + for _, item := range result { + rawEvent := item.Member.(string) + var event SandboxEvent + err = json.Unmarshal([]byte(rawEvent), &event) + if err != nil { + return nil, err + } + events = append(events, &event) + } + + return events, nil +} + +func (c *sandboxEventStore) AddEvent(sandboxId string, event *SandboxEvent, expiration time.Duration) error { + _, span := c.tracer.Start(c.ctx, "sandbox-event-store") + defer span.End() + + return c.redisClient.ZAdd(c.ctx, EventPrefix+sandboxId, redis.Z{ + Score: float64(time.Now().UnixNano()), + Member: event, + }).Err() +} + +func (c *sandboxEventStore) DelEvent(sandboxId string) error { + _, span := c.tracer.Start(c.ctx, "sandbox-event-delete") + defer span.End() + + return c.redisClient.Del(c.ctx, EventPrefix+sandboxId).Err() +} + +func (c *sandboxEventStore) Close() error { + return c.redisClient.Close() +} diff --git a/packages/orchestrator/internal/sandbox/network/network_linux.go b/packages/orchestrator/internal/sandbox/network/network_linux.go index 4f3b2a333e..7987f2fb75 100644 --- a/packages/orchestrator/internal/sandbox/network/network_linux.go +++ b/packages/orchestrator/internal/sandbox/network/network_linux.go @@ -13,6 +13,8 @@ import ( "github.com/vishvananda/netlink" "github.com/vishvananda/netns" "go.uber.org/zap" + + "github.com/e2b-dev/infra/packages/orchestrator/internal" ) func (s *Slot) CreateNetwork() error { @@ -217,6 +219,13 @@ func (s *Slot) CreateNetwork() error { return fmt.Errorf("error creating postrouting rule: %w", err) } + // Redirect traffic destined to event server + eventIP := internal.GetSandboxEventIP() + err = tables.Append("nat", "PREROUTING", "-i", s.VethName(), "-p", "tcp", "-d", eventIP, "--dport", "80", "-j", "REDIRECT", "--to-port", "5010") + if err != nil { + return fmt.Errorf("error creating HTTP redirect rule to sandbox event server: %w", err) + } + return nil } diff --git a/packages/orchestrator/internal/sandbox/sandbox.go b/packages/orchestrator/internal/sandbox/sandbox.go index f645313106..709fef772d 100644 --- a/packages/orchestrator/internal/sandbox/sandbox.go +++ b/packages/orchestrator/internal/sandbox/sandbox.go @@ -18,6 +18,7 @@ import ( "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/block" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/build" + "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/event" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/fc" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/nbd" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/network" @@ -241,6 +242,7 @@ func ResumeSandbox( devicePool *nbd.DevicePool, allowInternet, useClickhouseMetrics bool, + eventStore event.SandboxEventStore, ) (*Sandbox, *Cleanup, error) { childCtx, childSpan := tracer.Start(ctx, "new-sandbox") defer childSpan.End() @@ -421,6 +423,12 @@ func ResumeSandbox( return nil, cleanup, fmt.Errorf("failed to wait for sandbox start: %w", err) } + sandboxIP := ips.slot.HostIPString() + eventStore.SetSandboxIP(config.SandboxId, sandboxIP) + cleanup.AddPriority(func(ctx context.Context) error { + return eventStore.DelSandboxIP(sandboxIP) + }) + go sbx.Checks.Start() return sbx, cleanup, nil diff --git a/packages/orchestrator/internal/server/main.go b/packages/orchestrator/internal/server/main.go index 4427a1a828..93bc3e08c1 100644 --- a/packages/orchestrator/internal/server/main.go +++ b/packages/orchestrator/internal/server/main.go @@ -12,6 +12,7 @@ import ( "github.com/e2b-dev/infra/packages/orchestrator/internal/grpcserver" "github.com/e2b-dev/infra/packages/orchestrator/internal/proxy" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox" + "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/event" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/nbd" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/network" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/template" @@ -36,6 +37,7 @@ type server struct { devicePool *nbd.DevicePool persistence storage.StorageProvider featureFlags *featureflags.Client + eventStore event.SandboxEventStore } type Service struct { @@ -62,6 +64,7 @@ func New( proxy *proxy.SandboxProxy, sandboxes *smap.Map[*sandbox.Sandbox], featureFlags *featureflags.Client, + eventStore event.SandboxEventStore, ) (*Service, error) { srv := &Service{info: info} @@ -89,6 +92,7 @@ func New( devicePool: devicePool, persistence: persistence, featureFlags: featureFlags, + eventStore: eventStore, } meter := tel.MeterProvider.Meter("orchestrator.sandbox") diff --git a/packages/orchestrator/internal/server/sandboxes.go b/packages/orchestrator/internal/server/sandboxes.go index 467c5a4f1b..ed72532990 100644 --- a/packages/orchestrator/internal/server/sandboxes.go +++ b/packages/orchestrator/internal/server/sandboxes.go @@ -69,6 +69,7 @@ func (s *server) Create(ctxConn context.Context, req *orchestrator.SandboxCreate s.devicePool, config.AllowSandboxInternet, metricsWriteFlag, + s.eventStore, ) if err != nil { zap.L().Error("failed to create sandbox, cleaning up", zap.Error(err)) diff --git a/packages/orchestrator/internal/template/build/rootfs.go b/packages/orchestrator/internal/template/build/rootfs.go index 4b690fd05f..19dbb846cc 100644 --- a/packages/orchestrator/internal/template/build/rootfs.go +++ b/packages/orchestrator/internal/template/build/rootfs.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + "github.com/e2b-dev/infra/packages/orchestrator/internal" "github.com/e2b-dev/infra/packages/orchestrator/internal/template/build/ext4" "github.com/e2b-dev/infra/packages/orchestrator/internal/template/build/oci" "github.com/e2b-dev/infra/packages/orchestrator/internal/template/build/writer" @@ -197,6 +198,9 @@ ExecStart=-/sbin/agetty --noissue --autologin root %I 115200,38400,9600 vt102 ` hostname := "e2b.local" + eventProxyHostname := "events.e2b.dev" + + eventIP := internal.GetSandboxEventIP() hosts := fmt.Sprintf(`127.0.0.1 localhost ::1 localhost ip6-localhost ip6-loopback @@ -205,7 +209,8 @@ ff00:: ip6-mcastprefix ff02::1 ip6-allnodes ff02::2 ip6-allrouters 127.0.1.1 %s -`, hostname) +%s %s +`, hostname, eventIP, eventProxyHostname) e2bFile := fmt.Sprintf(`ENV_ID=%s BUILD_ID=%s diff --git a/packages/orchestrator/main.go b/packages/orchestrator/main.go index f3917872fa..acb26515e3 100644 --- a/packages/orchestrator/main.go +++ b/packages/orchestrator/main.go @@ -14,6 +14,7 @@ import ( "syscall" "time" + "github.com/redis/go-redis/v9" "go.uber.org/zap" "go.uber.org/zap/zapcore" "golang.org/x/sync/errgroup" @@ -22,6 +23,7 @@ import ( "github.com/e2b-dev/infra/packages/orchestrator/internal/metrics" "github.com/e2b-dev/infra/packages/orchestrator/internal/proxy" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox" + "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/event" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/nbd" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/network" "github.com/e2b-dev/infra/packages/orchestrator/internal/server" @@ -41,8 +43,9 @@ type Closeable interface { } const ( - defaultPort = 5008 - defaultProxyPort = 5007 + defaultPort = 5008 + defaultProxyPort = 5007 + defaultEventProxyPort = 5010 sandboxMetricExportPeriod = 5 * time.Second @@ -59,6 +62,7 @@ var ( func main() { port := flag.Uint("port", defaultPort, "orchestrator server port") proxyPort := flag.Uint("proxy-port", defaultProxyPort, "orchestrator proxy port") + eventProxyPort := flag.Uint("event-proxy-port", defaultEventProxyPort, "orchestrator event proxy port") flag.Parse() if *port > math.MaxUint16 { @@ -69,7 +73,7 @@ func main() { log.Fatalf("%d is larger than maximum possible proxy port %d", proxyPort, math.MaxInt16) } - success := run(*port, *proxyPort) + success := run(*port, *proxyPort, *eventProxyPort) log.Println("Stopping orchestrator, success:", success) @@ -78,7 +82,7 @@ func main() { } } -func run(port, proxyPort uint) (success bool) { +func run(port, proxyPort, sbxEventServerPort uint) (success bool) { success = true services := service.GetServices() @@ -219,8 +223,31 @@ func run(port, proxyPort uint) (success bool) { zap.L().Fatal("failed to create sandbox proxy", zap.Error(err)) } + var redisClient redis.UniversalClient + if redisClusterUrl := os.Getenv("REDIS_CLUSTER_URL"); redisClusterUrl != "" { + redisClient = redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: []string{redisClusterUrl}, + MinIdleConns: 1, + }) + } else if redisUrl := os.Getenv("REDIS_URL"); redisUrl != "" { + redisClient = redis.NewClient(&redis.Options{ + Addr: redisUrl, + MinIdleConns: 1, + }) + } else { + zap.L().Fatal("REDIS_URL not set") + } + defer redisClient.Close() + tracer := tel.TracerProvider.Tracer(serviceName) + eventStore := event.NewSandboxEventStore(ctx, tracer, redisClient) + defer eventStore.Close() + sbxEventHandlers := event.NewEventHandlers(ctx, eventStore) + + sbxEventServer := event.NewSandboxEventServer(sbxEventServerPort, sbxEventHandlers) + defer sbxEventServer.Close(ctx) + networkPool, err := network.NewPool(ctx, tel.MeterProvider, network.NewSlotsPoolSize, network.ReusedSlotsPoolSize, clientID, tracer) if err != nil { zap.L().Fatal("failed to create network pool", zap.Error(err)) @@ -245,7 +272,7 @@ func run(port, proxyPort uint) (success bool) { zap.L().Fatal("failed to create sandbox observer", zap.Error(err)) } - _, err = server.New(ctx, grpcSrv, tel, networkPool, devicePool, tracer, serviceInfo, sandboxProxy, sandboxes, featureFlags) + _, err = server.New(ctx, grpcSrv, tel, networkPool, devicePool, tracer, serviceInfo, sandboxProxy, sandboxes, featureFlags, eventStore) if err != nil { zap.L().Fatal("failed to create server", zap.Error(err)) } @@ -321,6 +348,22 @@ func run(port, proxyPort uint) (success bool) { return nil }) + g.Go(func() error { + sbxEventServerErr := sbxEventServer.Start() + if sbxEventServerErr != nil && !errors.Is(sbxEventServerErr, http.ErrServerClosed) { + select { + case serviceError <- sbxEventServerErr: + default: + // Don't block if the serviceError channel is already closed + // or if the error is already sent + } + + return sbxEventServerErr + } + + return nil + }) + g.Go(func() (err error) { // this sets the error declared above so the function // in the defer can check it. diff --git a/spec/openapi.yml b/spec/openapi.yml index 51c95d7dce..4273cc87a9 100644 --- a/spec/openapi.yml +++ b/spec/openapi.yml @@ -224,6 +224,22 @@ components: type: integer format: int64 description: Total memory in bytes + + SandboxEvent: + required: + - timestamp + - path + properties: + timestamp: + type: integer + format: int64 + description: Unix timestamp of the event in UTC + path: + type: string + description: Path of the event + body: + type: object + description: Body of the event Sandbox: required: @@ -1187,6 +1203,49 @@ paths: "500": $ref: "#/components/responses/500" + /sandboxes/{sandboxID}/events: + get: + description: Get sandbox events + tags: [sandboxes] + security: + - ApiKeyAuth: [] + - Supabase1TokenAuth: [] + Supabase2TeamAuth: [] + parameters: + - $ref: "#/components/parameters/sandboxID" + - in: query + name: offset + schema: + type: integer + format: int32 + minimum: 0 + default: 0 + description: Number of events to skip + - in: query + name: limit + schema: + type: integer + format: int32 + minimum: 1 + default: 100 + maximum: 1000 + description: Maximum number of events to return + responses: + "200": + description: Sandbox events + content: + application/json: + schema: + type: array + items: + $ref: "#/components/schemas/SandboxEvent" + "401": + $ref: "#/components/responses/401" + "404": + $ref: "#/components/responses/404" + "500": + $ref: "#/components/responses/500" + /sandboxes/{sandboxID}/refreshes: post: description: Refresh the sandbox extending its time to live