Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 9 additions & 65 deletions runs/repository/impl/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,78 +53,22 @@ func NewActionRepo(db *gorm.DB) interfaces.ActionRepo {
}

// CreateRun creates a new run (root action with parent_action_name = null)
func (r *actionRepo) CreateRun(ctx context.Context, req *workflow.CreateRunRequest) (*models.Run, error) {
// Determine run ID
var runID *common.RunIdentifier
switch id := req.Id.(type) {
case *workflow.CreateRunRequest_RunId:
runID = id.RunId
case *workflow.CreateRunRequest_ProjectId:
// Generate a run name (simplified - in production, use a better generator)
runID = &common.RunIdentifier{
Org: id.ProjectId.Organization,
Project: id.ProjectId.Name,
Domain: id.ProjectId.Domain,
Name: fmt.Sprintf("run-%d", time.Now().Unix()),
}
default:
return nil, fmt.Errorf("invalid run ID type")
}

// Build ActionSpec from CreateRunRequest
actionSpec := &workflow.ActionSpec{
ActionId: &common.ActionIdentifier{
Run: runID,
Name: runID.Name, // For root actions, action name = run name
},
ParentActionName: nil, // NULL for root actions
RunSpec: req.RunSpec,
InputUri: "", // TODO: build from inputs
RunOutputBase: "", // TODO: build output path
}

// Set the task spec based on the request
switch taskSpec := req.Task.(type) {
case *workflow.CreateRunRequest_TaskSpec:
actionSpec.Spec = &workflow.ActionSpec_Task{
Task: &workflow.TaskAction{
Spec: taskSpec.TaskSpec,
},
}
case *workflow.CreateRunRequest_TaskId:
actionSpec.Spec = &workflow.ActionSpec_Task{
Task: &workflow.TaskAction{
Id: taskSpec.TaskId,
},
}
}

// Serialize the ActionSpec to JSON
actionSpecBytes, err := json.Marshal(actionSpec)
if err != nil {
return nil, fmt.Errorf("failed to marshal action spec: %w", err)
}

// Create root action (represents the run)
run := &models.Run{
Org: runID.Org,
Project: runID.Project,
Domain: runID.Domain,
Name: runID.Name,
ParentActionName: nil, // NULL for root actions/runs
Phase: "PHASE_QUEUED",
ActionSpec: datatypes.JSON(actionSpecBytes),
ActionDetails: datatypes.JSON([]byte("{}")), // Empty details initially
}

func (r *actionRepo) CreateRun(ctx context.Context, run *models.Run) (*models.Run, error) {
// Save to database
if err := r.db.WithContext(ctx).Create(run).Error; err != nil {
return nil, fmt.Errorf("failed to create run: %w", err)
}

logger.Infof(ctx, "Created run: %s/%s/%s/%s (ID: %d)",
run.Org, run.Project, run.Domain, run.Name, run.ID)

// Notify subscribers of run creation
// Notify subscribers
runID := &common.RunIdentifier{
Org: run.Org,
Project: run.Project,
Domain: run.Domain,
Name: run.Name,
}
r.notifyRunUpdate(ctx, runID)

return run, nil
Expand Down
2 changes: 1 addition & 1 deletion runs/repository/interfaces/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
// ActionRepo defines the interface for actions/runs data access
type ActionRepo interface {
// Run operations
CreateRun(ctx context.Context, req *workflow.CreateRunRequest) (*models.Run, error)
CreateRun(ctx context.Context, run *models.Run) (*models.Run, error)
GetRun(ctx context.Context, runID *common.RunIdentifier) (*models.Run, error)
ListRuns(ctx context.Context, req *workflow.ListRunsRequest) ([]*models.Run, string, error)
AbortRun(ctx context.Context, runID *common.RunIdentifier, reason string, abortedBy *common.EnrichedIdentity) error
Expand Down
30 changes: 15 additions & 15 deletions runs/repository/mocks/action_repo.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

144 changes: 144 additions & 0 deletions runs/repository/transformers/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package transformers

import (
"context"
"fmt"
"time"

"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/timestamppb"
"gorm.io/datatypes"

"github.com/flyteorg/flyte/v2/flytestdlib/logger"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/common"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/workflow"
"github.com/flyteorg/flyte/v2/runs/repository/models"
)

const InitialPhase = "PHASE_QUEUED"

// CreateRunRequestToModel converts CreateRunRequest protobuf to Run domain model
func CreateRunRequestToModel(ctx context.Context, req *workflow.CreateRunRequest) (*models.Run, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer isolate service layer and repository layer to prevent the transformer function depends on service request. That is, let's not pass the CreateRunRequest and transform here. Instead, do things in service layer, and just pass runID and actionSpec into here. Like what we did in:

taskId := request.GetTaskId()
taskModel, err := transformers.NewTaskModel(ctx, taskId, taskSpec)

In this case, we can reuse the run proto to run model transform function in other places rather than only for create run function

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh~ I see. No problem

// Determine run ID
var runID *common.RunIdentifier
switch id := req.Id.(type) {
case *workflow.CreateRunRequest_RunId:
runID = id.RunId
case *workflow.CreateRunRequest_ProjectId:
// Generate a run name
runID = &common.RunIdentifier{
Org: id.ProjectId.Organization,
Project: id.ProjectId.Name,
Domain: id.ProjectId.Domain,
Name: fmt.Sprintf("run-%d", time.Now().Unix()),
}
logger.Debugf(ctx, "Generated run name: %s", runID.Name)
default:
return nil, fmt.Errorf("invalid run ID type")
}

// Build ActionSpec
actionSpec := &workflow.ActionSpec{
ActionId: &common.ActionIdentifier{
Run: runID,
Name: runID.Name,
},
ParentActionName: nil,
RunSpec: req.RunSpec,
InputUri: "",
RunOutputBase: "",
}

// Set the task spec
switch taskSpec := req.Task.(type) {
case *workflow.CreateRunRequest_TaskSpec:
actionSpec.Spec = &workflow.ActionSpec_Task{
Task: &workflow.TaskAction{
Spec: taskSpec.TaskSpec,
},
}
case *workflow.CreateRunRequest_TaskId:
actionSpec.Spec = &workflow.ActionSpec_Task{
Task: &workflow.TaskAction{
Id: taskSpec.TaskId,
},
}
}

// Serialize ActionSpec
actionSpecBytes, err := protojson.Marshal(actionSpec)
if err != nil {
logger.Errorf(ctx, "Failed to marshal ActionSpec: %v", err)
return nil, fmt.Errorf("failed to marshal action spec: %w", err)
}

// Create Run model
run := &models.Run{
Org: runID.Org,
Project: runID.Project,
Domain: runID.Domain,
Name: runID.Name,
ParentActionName: nil,
Phase: InitialPhase,
ActionSpec: datatypes.JSON(actionSpecBytes),
ActionDetails: datatypes.JSON([]byte("{}")), // Empty details initially
}

logger.Infof(ctx, "Created run model: %s/%s/%s/%s", run.Org, run.Project, run.Domain, run.Name)
return run, nil
}

// RunModelToCreateRunResponse converts a domain model Run to a CreateRunResponse
func RunModelToCreateRunResponse(run *models.Run, source workflow.RunSource) *workflow.CreateRunResponse {
if run == nil {
return nil
}

// Build the action identifier
actionID := &common.ActionIdentifier{
Run: &common.RunIdentifier{
Org: run.Org,
Project: run.Project,
Domain: run.Domain,
Name: run.Name,
},
Name: run.Name, // For root actions, action name = run name
}

// Build action status
actionStatus := &workflow.ActionStatus{
Phase: DBPhaseToProtobufPhase(run.Phase),
StartTime: timestamppb.New(run.CreatedAt),
Attempts: 0,
CacheStatus: core.CatalogCacheStatus_CACHE_DISABLED,
}

// Build action metadata
actionMetadata := &workflow.ActionMetadata{
Source: source, // ← Use the passed-in source
Parent: "",
ActionType: workflow.ActionType_ACTION_TYPE_TASK,
}

// Build the complete response
return &workflow.CreateRunResponse{
Run: &workflow.Run{
Action: &workflow.Action{
Id: actionID,
Status: actionStatus,
Metadata: actionMetadata,
},
},
}
}

func DBPhaseToProtobufPhase(dbPhase string) common.ActionPhase {
protoPhaseStr := "ACTION_" + dbPhase // "PHASE_QUEUED" → "ACTION_PHASE_QUEUED"

if val, ok := common.ActionPhase_value[protoPhaseStr]; ok {
return common.ActionPhase(val)
}

return common.ActionPhase_ACTION_PHASE_UNSPECIFIED
}
42 changes: 23 additions & 19 deletions runs/service/run_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/workflow/workflowconnect"
"github.com/flyteorg/flyte/v2/runs/repository/interfaces"
"github.com/flyteorg/flyte/v2/runs/repository/models"
"github.com/flyteorg/flyte/v2/runs/repository/transformers"
)

// RunService implements the RunServiceHandler interface
Expand Down Expand Up @@ -49,10 +50,17 @@ func (s *RunService) CreateRun(
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}

// Create run in database
run, err := s.repo.ActionRepo().CreateRun(ctx, req.Msg)
// Transform protobuf → domain model
runModel, err := transformers.CreateRunRequestToModel(ctx, req.Msg)
if err != nil {
logger.Errorf(ctx, "Failed to create run: %v", err)
logger.Errorf(ctx, "Failed to transform CreateRunRequest: %v", err)
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}

// Pass domain model to repositorym
run, err := s.repo.ActionRepo().CreateRun(ctx, runModel)
if err != nil {
logger.Errorf(ctx, "Failed to create run: %v", err)
return nil, connect.NewError(connect.CodeInternal, err)
}

Expand Down Expand Up @@ -91,25 +99,21 @@ func (s *RunService) CreateRun(
}
}

// Call queue service to enqueue the root action
_, err = s.queueClient.EnqueueAction(ctx, connect.NewRequest(enqueueReq))
if err != nil {
logger.Errorf(ctx, "Failed to enqueue root action: %v", err)
// Note: We don't fail the CreateRun if enqueue fails - the run is already created
// In production, you might want to mark the run as failed or retry the enqueue
logger.Warnf(ctx, "Run %s created but failed to enqueue root action", run.Name)
// Call queue service to enqueue the root action (if client is configured)
if s.queueClient != nil {
_, err = s.queueClient.EnqueueAction(ctx, connect.NewRequest(enqueueReq))
if err != nil {
logger.Errorf(ctx, "Failed to enqueue root action: %v", err)
// Note: We don't fail the CreateRun if enqueue fails - the run is already created
logger.Warnf(ctx, "Run %s created but failed to enqueue root action", run.Name)
} else {
logger.Infof(ctx, "Successfully enqueued root action for run %s", run.Name)
}
} else {
logger.Infof(ctx, "Successfully enqueued root action for run %s", run.Name)
logger.Warnf(ctx, "Queue client not configured, run %s created but not enqueued", run.Name)
}

// Build response (simplified - you'd convert the full Run model)
resp := &workflow.CreateRunResponse{
Run: &workflow.Run{
Action: &workflow.Action{
Id: actionID,
},
},
}
resp := transformers.RunModelToCreateRunResponse(run, req.Msg.Source)

return connect.NewResponse(resp), nil
}
Expand Down
Loading