Skip to content

[CLD-354]: feat(operation): support execution of dynamic operation #174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/moody-pots-find.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink-deployments-framework": minor
---

feat: support dynamic execution of operation
71 changes: 64 additions & 7 deletions operations/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package operations

import (
"context"
"errors"
"sync"

"github.com/Masterminds/semver/v3"
Expand All @@ -16,17 +17,36 @@ type Bundle struct {
GetContext func() context.Context
reporter Reporter
// internal use only, for storing the hash of the report to avoid repeat sha256 computation.
reportHashCache *sync.Map
reportHashCache *sync.Map
OperationRegistry *OperationRegistry
}

// BundleOption is a functional option for configuring a Bundle
type BundleOption func(*Bundle)

// WithOperationRegistry sets a custom OperationRegistry for the Bundle
func WithOperationRegistry(registry *OperationRegistry) BundleOption {
return func(b *Bundle) {
b.OperationRegistry = registry
}
}

// NewBundle creates and returns a new Bundle.
func NewBundle(getContext func() context.Context, logger logger.Logger, reporter Reporter) Bundle {
return Bundle{
Logger: logger,
GetContext: getContext,
reporter: reporter,
reportHashCache: &sync.Map{},
func NewBundle(getContext func() context.Context, logger logger.Logger, reporter Reporter, opts ...BundleOption) Bundle {
b := Bundle{
Logger: logger,
GetContext: getContext,
reporter: reporter,
reportHashCache: &sync.Map{},
OperationRegistry: NewOperationRegistry(),
}

// Apply all provided options
for _, opt := range opts {
opt(&b)
}

return b
}

// OperationHandler is the function signature of an operation handler.
Expand Down Expand Up @@ -66,6 +86,11 @@ func (o *Operation[IN, OUT, DEP]) Description() string {
return o.def.Description
}

// Def returns the operation definition.
func (o *Operation[IN, OUT, DEP]) Def() Definition {
return o.def
}

// execute runs the operation by calling the OperationHandler.
func (o *Operation[IN, OUT, DEP]) execute(b Bundle, deps DEP, input IN) (output OUT, err error) {
b.Logger.Infow("Executing operation",
Expand All @@ -74,6 +99,38 @@ func (o *Operation[IN, OUT, DEP]) execute(b Bundle, deps DEP, input IN) (output
return o.handler(b, deps, input)
}

// AsUntyped converts the operation to an untyped operation.
// This is useful for storing operations in a slice or passing them around without type constraints.
// Warning: The input and output types will be converted to `any`, so type safety is lost.
func (o *Operation[IN, OUT, DEP]) AsUntyped() *Operation[any, any, any] {
return &Operation[any, any, any]{
def: Definition{
ID: o.def.ID,
Version: o.def.Version,
Description: o.def.Description,
},
handler: func(b Bundle, deps any, input any) (any, error) {
var typedInput IN
if input != nil {
var ok bool
if typedInput, ok = input.(IN); !ok {
return nil, errors.New("input type mismatch")
}
}

var typedDeps DEP
if deps != nil {
var ok bool
if typedDeps, ok = deps.(DEP); !ok {
return nil, errors.New("dependencies type mismatch")
}
}

return o.execute(b, typedDeps, typedInput)
},
}
}

// NewOperation creates a new operation.
// Version can be created using semver.MustParse("1.0.0") or semver.New("1.0.0").
// Note: The handler should only perform maximum 1 side effect.
Expand Down
65 changes: 65 additions & 0 deletions operations/operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func Test_NewOperation(t *testing.T) {
assert.Equal(t, "sum", op.ID())
assert.Equal(t, version.String(), op.Version())
assert.Equal(t, description, op.Description())
assert.Equal(t, op.def, op.Def())
res, err := op.handler(Bundle{}, OpDeps{}, OpInput{1, 2})
require.NoError(t, err)
assert.Equal(t, 3, res)
Expand Down Expand Up @@ -82,3 +83,67 @@ func Test_Operation_WithEmptyInput(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, 1, out)
}

func Test_Operation_AsUntyped(t *testing.T) {
t.Parallel()

version := semver.MustParse("1.0.0")
description := "test operation"
handler1 := func(b Bundle, deps OpDeps, input OpInput) (output int, err error) {
return input.A + input.B, nil
}
typedOp := NewOperation("sum", version, description, handler1)

untypedOp := typedOp.AsUntyped()
bundle := NewBundle(context.Background, logger.Test(t), nil)

assert.Equal(t, "sum", untypedOp.ID())
assert.Equal(t, version.String(), untypedOp.Version())
assert.Equal(t, description, untypedOp.Description())

tests := []struct {
name string
deps any
input any
wantResult any
wantErr bool
errContains string
}{
{
name: "valid input and dependencies",
deps: OpDeps{},
input: OpInput{A: 3, B: 4},
wantResult: 7,
wantErr: false,
},
{
name: "invalid input type",
deps: OpDeps{},
input: struct{ C int }{C: 5},
wantErr: true,
errContains: "input type mismatch",
},
{
name: "invalid dependencies type",
deps: "invalid",
input: OpInput{A: 1, B: 2},
wantErr: true,
errContains: "dependencies type mismatch",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
result, err := untypedOp.handler(bundle, tt.deps, tt.input)

if tt.wantErr {
require.Error(t, err)
assert.Contains(t, err.Error(), tt.errContains)
} else {
require.NoError(t, err)
assert.Equal(t, tt.wantResult, result)
}
})
}
}
37 changes: 37 additions & 0 deletions operations/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package operations

import "errors"

// OperationRegistry is a store for operations that allows retrieval based on their definitions.
type OperationRegistry struct {
ops []*Operation[any, any, any]
}

// NewOperationRegistry creates a new OperationRegistry with the provided untyped operations.
func NewOperationRegistry(ops ...*Operation[any, any, any]) *OperationRegistry {
return &OperationRegistry{
ops: ops,
}
}

// Retrieve retrieves an operation from the store based on its definition.
// It returns an error if the operation is not found.
// The definition must match the operation's ID and version.
func (s OperationRegistry) Retrieve(def Definition) (*Operation[any, any, any], error) {
for _, op := range s.ops {
if op.ID() == def.ID && op.Version() == def.Version.String() {
return op, nil
}
}

return nil, errors.New("operation not found in registry")
}

// RegisterOperation registers new operations in the registry.
// To register operations with different input, output, and dependency types,
// call RegisterOperation multiple times with different type parameters.
func RegisterOperation[D, I, O any](r *OperationRegistry, op ...*Operation[D, I, O]) {
for _, o := range op {
r.ops = append(r.ops, o.AsUntyped())
}
}
161 changes: 161 additions & 0 deletions operations/registry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package operations

import (
"context"
"fmt"
"testing"

"github.com/Masterminds/semver/v3"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// ExampleOperationRegistry demonstrates how to create and use an OperationRegistry
// with operations being executed dynamically with different input/output types.
func ExampleOperationRegistry() {
// example dependencies for operations
type Deps1 struct{}
type Deps2 struct{}

// Create operations with different input/output types
stringOp := NewOperation(
"string-op",
semver.MustParse("1.0.0"),
"Echo string operation",
func(e Bundle, deps Deps1, input string) (string, error) {
return input, nil
},
)

intOp := NewOperation(
"int-op",
semver.MustParse("1.0.0"),
"Echo integer operation",
func(e Bundle, deps Deps2, input int) (int, error) {
return input, nil
},
)
// Create registry with untyped operations by providing optional initial operation
registry := NewOperationRegistry(stringOp.AsUntyped())

// An alternative way to register additional operations without calling AsUntyped()
RegisterOperation(registry, intOp)

// Create execution environment
b := NewBundle(context.Background, logger.Nop(), NewMemoryReporter(), WithOperationRegistry(registry))

// Define inputs and dependencies for operations
// inputs[0] is for stringOp, inputs[1] is for intOp
// deps[0] is for stringOp, deps[1] is for intOp
inputs := []any{"input1", 42}
deps := []any{Deps1{}, Deps2{}}
defs := []Definition{
stringOp.Def(),
intOp.Def(),
}

// dynamically retrieve and execute operations on different inputs
for i, def := range defs {
retrievedOp, err := registry.Retrieve(def)
if err != nil {
fmt.Println("error retrieving operation:", err)
continue
}

report, err := ExecuteOperation(b, retrievedOp, deps[i], inputs[i])
if err != nil {
fmt.Println("error executing operation:", err)
continue
}

fmt.Println("operation output:", report.Output)
}

// Output:
// operation output: input1
// operation output: 42
}

func TestOperationRegistry_Retrieve(t *testing.T) {
t.Parallel()

op1 := NewOperation(
"test-op-1",
semver.MustParse("1.0.0"),
"Operation 1",
func(e Bundle, deps OpDeps, input string) (string, error) { return input, nil },
)
op2 := NewOperation(
"test-op-2",
semver.MustParse("2.0.0"),
"Operation 2",
func(e Bundle, deps OpDeps, input int) (int, error) { return input * 2, nil },
)

tests := []struct {
name string
operations []*Operation[any, any, any]
lookup Definition
wantErr bool
wantErrMsg string
wantID string
wantVersion string
}{
{
name: "empty registry",
operations: nil,
lookup: Definition{ID: "test-op-1", Version: semver.MustParse("1.0.0")},
wantErr: true,
wantErrMsg: "operation not found in registry",
},
{
name: "retrieval by exact match - first operation",
operations: []*Operation[any, any, any]{op1.AsUntyped(), op2.AsUntyped()},
lookup: Definition{ID: "test-op-1", Version: semver.MustParse("1.0.0")},
wantErr: false,
wantID: "test-op-1",
wantVersion: "1.0.0",
},
{
name: "retrieval by exact match - second operation",
operations: []*Operation[any, any, any]{op1.AsUntyped(), op2.AsUntyped()},
lookup: Definition{ID: "test-op-2", Version: semver.MustParse("2.0.0")},
wantErr: false,
wantID: "test-op-2",
wantVersion: "2.0.0",
},
{
name: "operation not found - non-existent ID",
operations: []*Operation[any, any, any]{op1.AsUntyped(), op2.AsUntyped()},
lookup: Definition{ID: "non-existent", Version: semver.MustParse("1.0.0")},
wantErr: true,
wantErrMsg: "operation not found in registry",
},
{
name: "operation not found - wrong version",
operations: []*Operation[any, any, any]{op1.AsUntyped(), op2.AsUntyped()},
lookup: Definition{ID: "test-op-1", Version: semver.MustParse("3.0.0")},
wantErr: true,
wantErrMsg: "operation not found in registry",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

registry := NewOperationRegistry(tt.operations...)
retrievedOp, err := registry.Retrieve(tt.lookup)

if tt.wantErr {
require.Error(t, err)
assert.ErrorContains(t, err, tt.wantErrMsg)
} else {
require.NoError(t, err)
assert.Equal(t, tt.wantID, retrievedOp.ID())
assert.Equal(t, tt.wantVersion, retrievedOp.Version())
}
})
}
}