Skip to content

Commit c2971cb

Browse files
authored
Update execution server to allow storing metadata in Redis (#9357)
Introduces 3 flags to the execution server: - `remote_execution.write_execution_progress_state_to_redis`: if enabled, writes execution updates and invocation-execution links to Redis, using the new methods added in 9b6419d - `remote_execution.write_executions_to_primary_db`: if enabled, writes Execution rows and InvocationExecution rows to mysql. Defaults to true in order to preserve our current behavior. - `remote_execution.read_final_execution_state_from_redis`: if enabled, read the final execution state from redis instead of mysql when we write executions to clickhouse. The rollout plan to fully stop writing executions to MySQL would then be: - Rollout 1: enable `remote_execution.write_execution_progress_state_to_redis` - Rollout 2: enable `remote_execution.read_final_execution_state_from_redis` - Rollout 3: disable `remote_execution.write_executions_to_primary_db` This PR only affects the execution server; the UI endpoint (execution_service) also needs to be updated to read the in-progress execution state from redis, which will be done in a separate PR.
1 parent a971408 commit c2971cb

File tree

3 files changed

+101
-14
lines changed

3 files changed

+101
-14
lines changed

enterprise/server/remote_execution/execution_server/execution_server.go

Lines changed: 70 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"github.com/buildbuddy-io/buildbuddy/enterprise/server/remote_execution/operation"
1717
"github.com/buildbuddy-io/buildbuddy/enterprise/server/remote_execution/platform"
1818
"github.com/buildbuddy-io/buildbuddy/enterprise/server/tasksize"
19-
"github.com/buildbuddy-io/buildbuddy/enterprise/server/util/execution"
2019
"github.com/buildbuddy-io/buildbuddy/proto/invocation_status"
2120
"github.com/buildbuddy-io/buildbuddy/server/environment"
2221
"github.com/buildbuddy-io/buildbuddy/server/interfaces"
@@ -48,6 +47,7 @@ import (
4847
"google.golang.org/protobuf/types/known/anypb"
4948
"google.golang.org/protobuf/types/known/timestamppb"
5049

50+
executil "github.com/buildbuddy-io/buildbuddy/enterprise/server/util/execution"
5151
espb "github.com/buildbuddy-io/buildbuddy/proto/execution_stats"
5252
repb "github.com/buildbuddy-io/buildbuddy/proto/remote_execution"
5353
scpb "github.com/buildbuddy-io/buildbuddy/proto/scheduler"
@@ -78,6 +78,10 @@ const (
7878
var (
7979
enableRedisAvailabilityMonitoring = flag.Bool("remote_execution.enable_redis_availability_monitoring", false, "If enabled, the execution server will detect if Redis has lost state and will ask Bazel to retry executions.")
8080
sharedExecutorPoolTeeRate = flag.Float64("remote_execution.shared_executor_pool_tee_rate", 0, "If non-zero, work for the default shared executor pool will be teed to a separate experiment pool at this rate.", flag.Internal)
81+
82+
writeExecutionProgressStateToRedis = flag.Bool("remote_execution.write_execution_progress_state_to_redis", false, "If enabled, write initial execution metadata and progress updates (stage changes) to redis. This state is cleared when the execution is complete.", flag.Internal)
83+
writeExecutionsToPrimaryDB = flag.Bool("remote_execution.write_executions_to_primary_db", true, "If enabled, write executions and invocation-execution links to the primary DB.", flag.Internal)
84+
readFinalExecutionStateFromRedis = flag.Bool("remote_execution.read_final_execution_state_from_redis", false, "If enabled, read execution metadata and progress state from redis when writing executions to ClickHouse, instead of from the primary DB.", flag.Internal)
8185
)
8286

8387
func fillExecutionFromActionMetadata(md *repb.ExecutedActionMetadata, execution *tables.Execution) {
@@ -219,7 +223,20 @@ func (s *ExecutionServer) insertExecution(ctx context.Context, executionID, invo
219223
execution.GroupID = permissions.GroupID
220224
execution.Perms = execution.Perms | permissions.Perms
221225

222-
return s.env.GetDBHandle().NewQuery(ctx, "execution_server_create_execution").Create(execution)
226+
if *writeExecutionProgressStateToRedis {
227+
now := time.Now()
228+
execution.Model.CreatedAtUsec = now.UnixMicro()
229+
execution.Model.UpdatedAtUsec = now.UnixMicro()
230+
if err := s.env.GetExecutionCollector().UpdateInProgressExecution(ctx, executil.TableExecToProto(execution, nil /*=invocationLink*/)); err != nil {
231+
log.CtxErrorf(ctx, "Failed to write execution update to redis: %s", err)
232+
}
233+
}
234+
235+
if *writeExecutionsToPrimaryDB {
236+
return s.env.GetDBHandle().NewQuery(ctx, "execution_server_create_execution").Create(execution)
237+
}
238+
239+
return nil
223240
}
224241

225242
func (s *ExecutionServer) insertInvocationLink(ctx context.Context, executionID, invocationID string, linkType sipb.StoredInvocationLink_Type) error {
@@ -241,6 +258,10 @@ func (s *ExecutionServer) insertInvocationLink(ctx context.Context, executionID,
241258
log.CtxWarningf(ctx, "failed to add invocation link(exeuction_id: %q invocation_id: %q, link_type: %d) in redis", executionID, invocationID, linkType)
242259
}
243260

261+
if !*writeExecutionsToPrimaryDB {
262+
return nil
263+
}
264+
244265
link := &tables.InvocationExecution{
245266
InvocationID: invocationID,
246267
ExecutionID: executionID,
@@ -265,7 +286,11 @@ func (s *ExecutionServer) insertInvocationLinkInRedis(ctx context.Context, execu
265286
ExecutionId: executionID,
266287
Type: linkType,
267288
}
268-
return s.env.GetExecutionCollector().AddExecutionInvocationLink(ctx, link, false /*=storeReverseLink*/)
289+
// Only store the invocation => execution link if we're also writing
290+
// execution progress state to redis, since these links are only used to
291+
// list the in-progress state by invocation ID.
292+
storeInvocationExecutionLink := *writeExecutionProgressStateToRedis
293+
return s.env.GetExecutionCollector().AddExecutionInvocationLink(ctx, link, storeInvocationExecutionLink)
269294
}
270295

271296
func trimStatus(statusMessage string) string {
@@ -315,6 +340,17 @@ func (s *ExecutionServer) updateExecution(ctx context.Context, executionID strin
315340
}
316341
}
317342

343+
if *writeExecutionProgressStateToRedis {
344+
execution.Model.UpdatedAtUsec = time.Now().UnixMicro()
345+
executionProto := executil.TableExecToProto(execution, nil /*=invocationLink*/)
346+
if err := s.env.GetExecutionCollector().UpdateInProgressExecution(ctx, executionProto); err != nil {
347+
log.CtxErrorf(ctx, "Failed to write execution update to redis: %s", err)
348+
}
349+
}
350+
351+
if !*writeExecutionsToPrimaryDB {
352+
return nil
353+
}
318354
result := s.env.GetDBHandle().GORM(ctx, "execution_server_update_execution").Where(
319355
"execution_id = ? AND stage != ?", executionID, repb.ExecutionStage_COMPLETED).Updates(execution)
320356
dbErr := result.Error
@@ -349,17 +385,38 @@ func (s *ExecutionServer) recordExecution(
349385
if s.env.GetDBHandle() == nil {
350386
return status.FailedPreconditionError("database not configured")
351387
}
352-
var executionPrimaryDB tables.Execution
353-
354-
if err := s.env.GetDBHandle().NewQuery(ctx, "execution_server_lookup_execution").Raw(
355-
`SELECT * FROM "Executions" WHERE execution_id = ?`, executionID).Take(&executionPrimaryDB); err != nil {
356-
return status.InternalErrorf("failed to look up execution %q: %s", executionID, err)
388+
var executionProto *repb.StoredExecution
389+
// Read final execution details (including the initial metadata and
390+
// completed execution result) from either redis or the primary DB. This
391+
// state does not have the invocation details populated.
392+
if *readFinalExecutionStateFromRedis {
393+
ex, err := s.env.GetExecutionCollector().GetInProgressExecution(ctx, executionID)
394+
if err != nil {
395+
return status.InternalErrorf("failed to get execution %q from redis: %s", executionID, err)
396+
}
397+
if ex == nil {
398+
return status.NotFoundErrorf("execution %q not found in redis", executionID)
399+
}
400+
executionProto = ex
401+
} else {
402+
var executionPrimaryDB tables.Execution
403+
if err := s.env.GetDBHandle().NewQuery(ctx, "execution_server_lookup_execution").Raw(
404+
`SELECT * FROM "Executions" WHERE execution_id = ?`, executionID).Take(&executionPrimaryDB); err != nil {
405+
return status.InternalErrorf("failed to look up execution %q: %s", executionID, err)
406+
}
407+
executionProto = executil.TableExecToProto(&executionPrimaryDB, nil)
357408
}
358-
// Always clean up invocationLinks in Collector because we are not retrying
409+
// Always clean up invocationLinks and execution updates from the collector.
410+
// The execution cannot be retried after this point, so nothing will clean
411+
// up this data if we don't do it here.
359412
defer func() {
360413
err := s.env.GetExecutionCollector().DeleteExecutionInvocationLinks(ctx, executionID)
361414
if err != nil {
362-
log.CtxErrorf(ctx, "failed to clean up invocation links in collector: %s", err)
415+
log.CtxErrorf(ctx, "Failed to clean up invocation links in collector: %s", err)
416+
}
417+
err = s.env.GetExecutionCollector().DeleteInProgressExecution(ctx, executionID)
418+
if err != nil {
419+
log.CtxErrorf(ctx, "Failed to clean up in-progress execution in collector: %s", err)
363420
}
364421
}()
365422
links, err := s.env.GetExecutionCollector().GetExecutionInvocationLinks(ctx, executionID)
@@ -368,7 +425,9 @@ func (s *ExecutionServer) recordExecution(
368425
}
369426
rmd := bazel_request.GetRequestMetadata(ctx)
370427
for _, link := range links {
371-
executionProto := execution.TableExecToProto(&executionPrimaryDB, link)
428+
executionProto := executionProto.CloneVT()
429+
executil.SetInvocationLink(executionProto, link)
430+
372431
// Set fields that aren't stored in the primary DB
373432
executionProto.TargetLabel = rmd.GetTargetId()
374433
executionProto.ActionMnemonic = rmd.GetActionMnemonic()

enterprise/server/remote_execution/execution_server/execution_server_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,25 @@ func TestExecuteAndPublishOperation(t *testing.T) {
399399
expectedExecutionUsage: tables.UsageCounts{LinuxExecutionDurationUsec: durationUsec},
400400
publishMoreMetadata: true,
401401
},
402+
{
403+
name: "PublishMoreMetadata_PrimaryDBAndRedisDoubleWrite",
404+
expectedExecutionUsage: tables.UsageCounts{LinuxExecutionDurationUsec: durationUsec},
405+
publishMoreMetadata: true,
406+
flagOverrides: map[string]any{
407+
"remote_execution.write_execution_progress_state_to_redis": true,
408+
"remote_execution.write_executions_to_primary_db": true,
409+
},
410+
},
411+
{
412+
name: "PublishMoreMetadata_NoPrimaryDB_RedisOnly",
413+
expectedExecutionUsage: tables.UsageCounts{LinuxExecutionDurationUsec: durationUsec},
414+
publishMoreMetadata: true,
415+
flagOverrides: map[string]any{
416+
"remote_execution.write_execution_progress_state_to_redis": true,
417+
"remote_execution.write_executions_to_primary_db": false,
418+
"remote_execution.read_final_execution_state_from_redis": true,
419+
},
420+
},
402421
} {
403422
t.Run(test.name, func(t *testing.T) {
404423
testExecuteAndPublishOperation(t, test)
@@ -409,6 +428,7 @@ func TestExecuteAndPublishOperation(t *testing.T) {
409428
type publishTest struct {
410429
name string
411430
platformOverrides map[string]string
431+
flagOverrides map[string]any
412432
expectedSelfHosted bool
413433
expectedExecutionUsage tables.UsageCounts
414434
cachedResult, doNotCache bool
@@ -420,6 +440,9 @@ type publishTest struct {
420440
func testExecuteAndPublishOperation(t *testing.T, test publishTest) {
421441
ctx := context.Background()
422442
flags.Set(t, "app.enable_write_executions_to_olap_db", true)
443+
for k, v := range test.flagOverrides {
444+
flags.Set(t, k, v)
445+
}
423446
env, conn := setupEnv(t)
424447
redis := testredis.Start(t)
425448
env.SetDefaultRedisClient(redis.Client())

enterprise/server/util/execution/execution.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,10 @@ import (
1919
)
2020

2121
func TableExecToProto(in *tables.Execution, invLink *sipb.StoredInvocationLink) *repb.StoredExecution {
22-
return &repb.StoredExecution{
22+
ex := &repb.StoredExecution{
2323
GroupId: in.GroupID,
2424
UpdatedAtUsec: in.UpdatedAtUsec,
2525
ExecutionId: in.ExecutionID,
26-
InvocationUuid: strings.Replace(invLink.GetInvocationId(), "-", "", -1),
27-
InvocationLinkType: int32(invLink.GetType()),
2826
CreatedAtUsec: in.CreatedAtUsec,
2927
UserId: in.UserID,
3028
Worker: in.Worker,
@@ -55,6 +53,13 @@ func TableExecToProto(in *tables.Execution, invLink *sipb.StoredInvocationLink)
5553
DoNotCache: in.DoNotCache,
5654
CommandSnippet: in.CommandSnippet,
5755
}
56+
SetInvocationLink(ex, invLink)
57+
return ex
58+
}
59+
60+
func SetInvocationLink(ex *repb.StoredExecution, invLink *sipb.StoredInvocationLink) {
61+
ex.InvocationUuid = strings.Replace(invLink.GetInvocationId(), "-", "", -1)
62+
ex.InvocationLinkType = int32(invLink.GetType())
5863
}
5964

6065
func TableExecToClientProto(in *tables.Execution) (*espb.Execution, error) {

0 commit comments

Comments
 (0)