From ec99ee536c0fb1d3f1a2fad8a1ecde6a6c7a6b58 Mon Sep 17 00:00:00 2001 From: Graham Goh Date: Thu, 12 Jun 2025 01:24:19 +1000 Subject: [PATCH 1/2] feat(operation): support execution of dynamic operation To enhance the flexibility of Operations API, we want to support dynamic execution of Operation. Imagine this snippet of code: ``` inputs := []any{"input1", 42} deps := []any{Deps1{}, Deps2{}} defs := []Definition{ {ID: "string-op", Version: semver.MustParse("1.0.0")}, {ID: "int-op", Version: semver.MustParse("1.0.0")}, } // dynamically retrieve and execute operations on different inputs for i, def := range defs { // registry contains a list of operations retrievedOp, _ := registry.Retrieve(def) // dependency and input are dynamic/different per Operation report, _ := ExecuteOperation(b, retrievedOp, deps[i], inputs[i]) } ``` Based on the definition, we can decide on run time what operations to execute and what input and deps to pass in to that operation. This can be quite powerful to allow users to dynamically construct a series of operation under a sequence. JIRA: https://smartcontract-it.atlassian.net/browse/CLD-354 --- .changeset/moody-pots-find.md | 5 ++ operations/operation.go | 71 +++++++++++++-- operations/operation_test.go | 65 ++++++++++++++ operations/registry.go | 28 ++++++ operations/registry_test.go | 158 ++++++++++++++++++++++++++++++++++ 5 files changed, 320 insertions(+), 7 deletions(-) create mode 100644 .changeset/moody-pots-find.md create mode 100644 operations/registry.go create mode 100644 operations/registry_test.go diff --git a/.changeset/moody-pots-find.md b/.changeset/moody-pots-find.md new file mode 100644 index 0000000..790cfc3 --- /dev/null +++ b/.changeset/moody-pots-find.md @@ -0,0 +1,5 @@ +--- +"chainlink-deployments-framework": minor +--- + +feat: support dynamic execution of operation diff --git a/operations/operation.go b/operations/operation.go index 9013e3f..bdb327e 100644 --- a/operations/operation.go +++ b/operations/operation.go @@ -2,6 +2,7 @@ package operations import ( "context" + "errors" "sync" "github.com/Masterminds/semver/v3" @@ -16,17 +17,36 @@ type Bundle struct { GetContext func() context.Context reporter Reporter // internal use only, for storing the hash of the report to avoid repeat sha256 computation. - reportHashCache *sync.Map + reportHashCache *sync.Map + OperationRegistry OperationRegistry +} + +// BundleOption is a functional option for configuring a Bundle +type BundleOption func(*Bundle) + +// WithOperationRegistry sets a custom OperationRegistry for the Bundle +func WithOperationRegistry(registry OperationRegistry) BundleOption { + return func(b *Bundle) { + b.OperationRegistry = registry + } } // NewBundle creates and returns a new Bundle. -func NewBundle(getContext func() context.Context, logger logger.Logger, reporter Reporter) Bundle { - return Bundle{ - Logger: logger, - GetContext: getContext, - reporter: reporter, - reportHashCache: &sync.Map{}, +func NewBundle(getContext func() context.Context, logger logger.Logger, reporter Reporter, opts ...BundleOption) Bundle { + b := Bundle{ + Logger: logger, + GetContext: getContext, + reporter: reporter, + reportHashCache: &sync.Map{}, + OperationRegistry: NewOperationRegistry(), } + + // Apply all provided options + for _, opt := range opts { + opt(&b) + } + + return b } // OperationHandler is the function signature of an operation handler. @@ -66,6 +86,11 @@ func (o *Operation[IN, OUT, DEP]) Description() string { return o.def.Description } +// Def returns the operation definition. +func (o *Operation[IN, OUT, DEP]) Def() Definition { + return o.def +} + // execute runs the operation by calling the OperationHandler. func (o *Operation[IN, OUT, DEP]) execute(b Bundle, deps DEP, input IN) (output OUT, err error) { b.Logger.Infow("Executing operation", @@ -74,6 +99,38 @@ func (o *Operation[IN, OUT, DEP]) execute(b Bundle, deps DEP, input IN) (output return o.handler(b, deps, input) } +// AsUntyped converts the operation to an untyped operation. +// This is useful for storing operations in a slice or passing them around without type constraints. +// Warning: The input and output types will be converted to `any`, so type safety is lost. +func (o *Operation[IN, OUT, DEP]) AsUntyped() *Operation[any, any, any] { + return &Operation[any, any, any]{ + def: Definition{ + ID: o.def.ID, + Version: o.def.Version, + Description: o.def.Description, + }, + handler: func(b Bundle, deps any, input any) (any, error) { + var typedInput IN + if input != nil { + var ok bool + if typedInput, ok = input.(IN); !ok { + return nil, errors.New("input type mismatch") + } + } + + var typedDeps DEP + if deps != nil { + var ok bool + if typedDeps, ok = deps.(DEP); !ok { + return nil, errors.New("dependencies type mismatch") + } + } + + return o.execute(b, typedDeps, typedInput) + }, + } +} + // NewOperation creates a new operation. // Version can be created using semver.MustParse("1.0.0") or semver.New("1.0.0"). // Note: The handler should only perform maximum 1 side effect. diff --git a/operations/operation_test.go b/operations/operation_test.go index 93f8eda..25cc552 100644 --- a/operations/operation_test.go +++ b/operations/operation_test.go @@ -32,6 +32,7 @@ func Test_NewOperation(t *testing.T) { assert.Equal(t, "sum", op.ID()) assert.Equal(t, version.String(), op.Version()) assert.Equal(t, description, op.Description()) + assert.Equal(t, op.def, op.Def()) res, err := op.handler(Bundle{}, OpDeps{}, OpInput{1, 2}) require.NoError(t, err) assert.Equal(t, 3, res) @@ -82,3 +83,67 @@ func Test_Operation_WithEmptyInput(t *testing.T) { require.NoError(t, err) assert.Equal(t, 1, out) } + +func Test_Operation_AsUntyped(t *testing.T) { + t.Parallel() + + version := semver.MustParse("1.0.0") + description := "test operation" + handler1 := func(b Bundle, deps OpDeps, input OpInput) (output int, err error) { + return input.A + input.B, nil + } + typedOp := NewOperation("sum", version, description, handler1) + + untypedOp := typedOp.AsUntyped() + bundle := NewBundle(context.Background, logger.Test(t), nil) + + assert.Equal(t, "sum", untypedOp.ID()) + assert.Equal(t, version.String(), untypedOp.Version()) + assert.Equal(t, description, untypedOp.Description()) + + tests := []struct { + name string + deps any + input any + wantResult any + wantErr bool + errContains string + }{ + { + name: "valid input and dependencies", + deps: OpDeps{}, + input: OpInput{A: 3, B: 4}, + wantResult: 7, + wantErr: false, + }, + { + name: "invalid input type", + deps: OpDeps{}, + input: struct{ C int }{C: 5}, + wantErr: true, + errContains: "input type mismatch", + }, + { + name: "invalid dependencies type", + deps: "invalid", + input: OpInput{A: 1, B: 2}, + wantErr: true, + errContains: "dependencies type mismatch", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + result, err := untypedOp.handler(bundle, tt.deps, tt.input) + + if tt.wantErr { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.errContains) + } else { + require.NoError(t, err) + assert.Equal(t, tt.wantResult, result) + } + }) + } +} diff --git a/operations/registry.go b/operations/registry.go new file mode 100644 index 0000000..b18b589 --- /dev/null +++ b/operations/registry.go @@ -0,0 +1,28 @@ +package operations + +import "errors" + +// OperationRegistry is a store for operations that allows retrieval based on their definitions. +type OperationRegistry struct { + ops []*Operation[any, any, any] +} + +// NewOperationRegistry creates a new OperationRegistry with the provided untyped operations. +func NewOperationRegistry(ops ...*Operation[any, any, any]) OperationRegistry { + return OperationRegistry{ + ops: ops, + } +} + +// Retrieve retrieves an operation from the store based on its definition. +// It returns an error if the operation is not found. +// The definition must match the operation's ID and version. +func (s OperationRegistry) Retrieve(def Definition) (*Operation[any, any, any], error) { + for _, op := range s.ops { + if op.ID() == def.ID && op.Version() == def.Version.String() { + return op, nil + } + } + + return nil, errors.New("operation not found in registry") +} diff --git a/operations/registry_test.go b/operations/registry_test.go new file mode 100644 index 0000000..edec5b3 --- /dev/null +++ b/operations/registry_test.go @@ -0,0 +1,158 @@ +package operations + +import ( + "context" + "fmt" + "testing" + + "github.com/Masterminds/semver/v3" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// ExampleOperationRegistry demonstrates how to create and use an OperationRegistry +// with operations being executed dynamically with different input/output types. +func ExampleOperationRegistry() { + // example dependencies for operations + type Deps1 struct{} + type Deps2 struct{} + + // Create operations with different input/output types + stringOp := NewOperation( + "string-op", + semver.MustParse("1.0.0"), + "Echo string operation", + func(e Bundle, deps Deps1, input string) (string, error) { + return input, nil + }, + ) + + intOp := NewOperation( + "int-op", + semver.MustParse("1.0.0"), + "Echo integer operation", + func(e Bundle, deps Deps2, input int) (int, error) { + return input, nil + }, + ) + // Create registry with untyped operations + registry := NewOperationRegistry(stringOp.AsUntyped(), intOp.AsUntyped()) + + // Create execution environment + b := NewBundle(context.Background, logger.Nop(), NewMemoryReporter(), WithOperationRegistry(registry)) + + // Define inputs and dependencies for operations + // inputs[0] is for stringOp, inputs[1] is for intOp + // deps[0] is for stringOp, deps[1] is for intOp + inputs := []any{"input1", 42} + deps := []any{Deps1{}, Deps2{}} + defs := []Definition{ + stringOp.Def(), + intOp.Def(), + } + + // dynamically retrieve and execute operations on different inputs + for i, def := range defs { + retrievedOp, err := registry.Retrieve(def) + if err != nil { + fmt.Println("error retrieving operation:", err) + continue + } + + report, err := ExecuteOperation(b, retrievedOp, deps[i], inputs[i]) + if err != nil { + fmt.Println("error executing operation:", err) + continue + } + + fmt.Println("operation output:", report.Output) + } + + // Output: + // operation output: input1 + // operation output: 42 +} + +func TestOperationRegistry_Retrieve(t *testing.T) { + t.Parallel() + + op1 := NewOperation( + "test-op-1", + semver.MustParse("1.0.0"), + "Operation 1", + func(e Bundle, deps OpDeps, input string) (string, error) { return input, nil }, + ) + op2 := NewOperation( + "test-op-2", + semver.MustParse("2.0.0"), + "Operation 2", + func(e Bundle, deps OpDeps, input int) (int, error) { return input * 2, nil }, + ) + + tests := []struct { + name string + operations []*Operation[any, any, any] + lookup Definition + wantErr bool + wantErrMsg string + wantID string + wantVersion string + }{ + { + name: "empty registry", + operations: nil, + lookup: Definition{ID: "test-op-1", Version: semver.MustParse("1.0.0")}, + wantErr: true, + wantErrMsg: "operation not found in registry", + }, + { + name: "retrieval by exact match - first operation", + operations: []*Operation[any, any, any]{op1.AsUntyped(), op2.AsUntyped()}, + lookup: Definition{ID: "test-op-1", Version: semver.MustParse("1.0.0")}, + wantErr: false, + wantID: "test-op-1", + wantVersion: "1.0.0", + }, + { + name: "retrieval by exact match - second operation", + operations: []*Operation[any, any, any]{op1.AsUntyped(), op2.AsUntyped()}, + lookup: Definition{ID: "test-op-2", Version: semver.MustParse("2.0.0")}, + wantErr: false, + wantID: "test-op-2", + wantVersion: "2.0.0", + }, + { + name: "operation not found - non-existent ID", + operations: []*Operation[any, any, any]{op1.AsUntyped(), op2.AsUntyped()}, + lookup: Definition{ID: "non-existent", Version: semver.MustParse("1.0.0")}, + wantErr: true, + wantErrMsg: "operation not found in registry", + }, + { + name: "operation not found - wrong version", + operations: []*Operation[any, any, any]{op1.AsUntyped(), op2.AsUntyped()}, + lookup: Definition{ID: "test-op-1", Version: semver.MustParse("3.0.0")}, + wantErr: true, + wantErrMsg: "operation not found in registry", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + registry := NewOperationRegistry(tt.operations...) + retrievedOp, err := registry.Retrieve(tt.lookup) + + if tt.wantErr { + require.Error(t, err) + assert.ErrorContains(t, err, tt.wantErrMsg) + } else { + require.NoError(t, err) + assert.Equal(t, tt.wantID, retrievedOp.ID()) + assert.Equal(t, tt.wantVersion, retrievedOp.Version()) + } + }) + } +} From f23f6b61b43423abf24c5061643b9ac2bba86c77 Mon Sep 17 00:00:00 2001 From: Graham Goh Date: Fri, 13 Jun 2025 17:52:14 +1000 Subject: [PATCH 2/2] fix: add helper to register operation without calling untyped --- operations/operation.go | 4 ++-- operations/registry.go | 13 +++++++++++-- operations/registry_test.go | 7 +++++-- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/operations/operation.go b/operations/operation.go index bdb327e..910fef1 100644 --- a/operations/operation.go +++ b/operations/operation.go @@ -18,14 +18,14 @@ type Bundle struct { reporter Reporter // internal use only, for storing the hash of the report to avoid repeat sha256 computation. reportHashCache *sync.Map - OperationRegistry OperationRegistry + OperationRegistry *OperationRegistry } // BundleOption is a functional option for configuring a Bundle type BundleOption func(*Bundle) // WithOperationRegistry sets a custom OperationRegistry for the Bundle -func WithOperationRegistry(registry OperationRegistry) BundleOption { +func WithOperationRegistry(registry *OperationRegistry) BundleOption { return func(b *Bundle) { b.OperationRegistry = registry } diff --git a/operations/registry.go b/operations/registry.go index b18b589..9fbb998 100644 --- a/operations/registry.go +++ b/operations/registry.go @@ -8,8 +8,8 @@ type OperationRegistry struct { } // NewOperationRegistry creates a new OperationRegistry with the provided untyped operations. -func NewOperationRegistry(ops ...*Operation[any, any, any]) OperationRegistry { - return OperationRegistry{ +func NewOperationRegistry(ops ...*Operation[any, any, any]) *OperationRegistry { + return &OperationRegistry{ ops: ops, } } @@ -26,3 +26,12 @@ func (s OperationRegistry) Retrieve(def Definition) (*Operation[any, any, any], return nil, errors.New("operation not found in registry") } + +// RegisterOperation registers new operations in the registry. +// To register operations with different input, output, and dependency types, +// call RegisterOperation multiple times with different type parameters. +func RegisterOperation[D, I, O any](r *OperationRegistry, op ...*Operation[D, I, O]) { + for _, o := range op { + r.ops = append(r.ops, o.AsUntyped()) + } +} diff --git a/operations/registry_test.go b/operations/registry_test.go index edec5b3..f542ec3 100644 --- a/operations/registry_test.go +++ b/operations/registry_test.go @@ -36,8 +36,11 @@ func ExampleOperationRegistry() { return input, nil }, ) - // Create registry with untyped operations - registry := NewOperationRegistry(stringOp.AsUntyped(), intOp.AsUntyped()) + // Create registry with untyped operations by providing optional initial operation + registry := NewOperationRegistry(stringOp.AsUntyped()) + + // An alternative way to register additional operations without calling AsUntyped() + RegisterOperation(registry, intOp) // Create execution environment b := NewBundle(context.Background, logger.Nop(), NewMemoryReporter(), WithOperationRegistry(registry))