Skip to content
This repository was archived by the owner on May 31, 2024. It is now read-only.

Commit 35de5b8

Browse files
authored
Hydrate failure node (#456)
Signed-off-by: Kevin Su <pingsutw@gmail.com>
1 parent 54ddddb commit 35de5b8

File tree

4 files changed

+37
-2
lines changed

4 files changed

+37
-2
lines changed

cmd/register/files_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,31 @@ func TestRegisterFromFiles(t *testing.T) {
6060
err = registerFromFilesFunc(s.Ctx, args, s.CmdCtx)
6161
assert.Nil(t, err)
6262
})
63+
t.Run("Register a workflow with a failure node", func(t *testing.T) {
64+
s := setup()
65+
testScope := promutils.NewTestScope()
66+
labeled.SetMetricKeys(contextutils.AppNameKey, contextutils.ProjectKey, contextutils.DomainKey)
67+
registerFilesSetup()
68+
rconfig.DefaultFilesConfig.Archive = true
69+
rconfig.DefaultFilesConfig.OutputLocationPrefix = s3Output
70+
rconfig.DefaultFilesConfig.DeprecatedSourceUploadPath = s3Output
71+
mockStorage, err := storage.NewDataStore(&storage.Config{
72+
Type: storage.TypeMemory,
73+
}, testScope.NewSubScope("flytectl"))
74+
assert.Nil(t, err)
75+
Client = mockStorage
76+
77+
args := []string{"testdata/failure-node.tgz"}
78+
s.MockAdminClient.OnCreateTaskMatch(mock.Anything, mock.Anything).Return(nil, nil)
79+
s.MockAdminClient.OnCreateWorkflowMatch(mock.Anything, mock.Anything).Return(nil, nil)
80+
s.MockAdminClient.OnCreateLaunchPlanMatch(mock.Anything, mock.Anything).Return(nil, nil)
81+
s.MockAdminClient.OnUpdateLaunchPlanMatch(mock.Anything, mock.Anything).Return(nil, nil)
82+
mockDataProxy := s.MockClient.DataProxyClient().(*mocks.DataProxyServiceClient)
83+
mockDataProxy.OnCreateUploadLocationMatch(s.Ctx, mock.Anything).Return(&service.CreateUploadLocationResponse{}, nil)
84+
85+
err = registerFromFilesFunc(s.Ctx, args, s.CmdCtx)
86+
assert.Nil(t, err)
87+
})
6388
t.Run("Failed fast registration while uploading the codebase", func(t *testing.T) {
6489
s := setup()
6590
registerFilesSetup()

cmd/register/register.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
// Long descriptions are whitespace sensitive when generating docs using sphinx.
1111
const (
1212
registerCmdShort = "Registers tasks, workflows, and launch plans from a list of generated serialized files."
13-
registercmdLong = `
13+
registerCmdLong = `
1414
Take input files as serialized versions of the tasks/workflows/launchplans and register them with FlyteAdmin.
1515
Currently, these input files are protobuf files generated as output from Flytekit serialize.
1616
Project and Domain are mandatory fields to be passed for registration and an optional version which defaults to v1.
@@ -23,7 +23,7 @@ func RemoteRegisterCommand() *cobra.Command {
2323
registerCmd := &cobra.Command{
2424
Use: "register",
2525
Short: registerCmdShort,
26-
Long: registercmdLong,
26+
Long: registerCmdLong,
2727
}
2828
registerResourcesFuncs := map[string]cmdcore.CommandEntry{
2929
"files": {CmdFunc: registerFromFilesFunc, Aliases: []string{"file"}, PFlagProvider: rconfig.DefaultFilesConfig,

cmd/register/register_util.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,13 +491,23 @@ func hydrateSpec(message proto.Message, uploadLocation storage.DataReference, co
491491
return err
492492
}
493493
}
494+
if workflowSpec.Template.GetFailureNode() != nil {
495+
if err := hydrateNode(workflowSpec.Template.GetFailureNode(), config.Version, config.Force); err != nil {
496+
return err
497+
}
498+
}
494499
hydrateIdentifier(workflowSpec.Template.Id, config.Version, config.Force)
495500
for _, subWorkflow := range workflowSpec.SubWorkflows {
496501
for _, Noderef := range subWorkflow.Nodes {
497502
if err := hydrateNode(Noderef, config.Version, config.Force); err != nil {
498503
return err
499504
}
500505
}
506+
if subWorkflow.GetFailureNode() != nil {
507+
if err := hydrateNode(subWorkflow.GetFailureNode(), config.Version, config.Force); err != nil {
508+
return err
509+
}
510+
}
501511
hydrateIdentifier(subWorkflow.Id, config.Version, config.Force)
502512
}
503513
case *admin.TaskSpec:
1.63 KB
Binary file not shown.

0 commit comments

Comments
 (0)