From e8f1a3d2b011f7a17f97752b159c7d8cefd64a0e Mon Sep 17 00:00:00 2001 From: Artem Klevtsov Date: Fri, 1 Aug 2025 00:37:42 +0700 Subject: [PATCH 1/2] Add exit processor (#3196) --- internal/impl/pure/processor_exit.go | 42 ++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 internal/impl/pure/processor_exit.go diff --git a/internal/impl/pure/processor_exit.go b/internal/impl/pure/processor_exit.go new file mode 100644 index 000000000..019bc56de --- /dev/null +++ b/internal/impl/pure/processor_exit.go @@ -0,0 +1,42 @@ +// Copyright 2025 Redpanda Data, Inc. + +package pure + +import ( + "context" + "os" + + "github.com/redpanda-data/benthos/v4/public/service" +) + +func init() { + spec := service.NewConfigSpec(). + Categories("Utility"). + Beta(). + Summary(`Exit the process with a code.`). + Field(service.NewIntField("").Description("The exit code to use.").Default(0)) + service.MustRegisterProcessor( + "exit", spec, + func(conf *service.ParsedConfig, res *service.Resources) (service.Processor, error) { + exitCode, err := conf.FieldInt() + if err != nil { + return nil, err + } + + return &exitProcessor{exitCode}, nil + }) +} + +type exitProcessor struct { + exitCode int +} + +func (l *exitProcessor) Process(ctx context.Context, msg *service.Message) (service.MessageBatch, error) { + os.Exit(l.exitCode) + + return nil, nil +} + +func (l *exitProcessor) Close(ctx context.Context) error { + return nil +} From 3a8231a16947ca41875081c1f2ec9f549e4a3689 Mon Sep 17 00:00:00 2001 From: Artem Klevtsov Date: Fri, 1 Aug 2025 00:42:22 +0700 Subject: [PATCH 2/2] Remove exit processor field description --- internal/impl/pure/processor_exit.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/impl/pure/processor_exit.go b/internal/impl/pure/processor_exit.go index 019bc56de..b53ebd81a 100644 --- a/internal/impl/pure/processor_exit.go +++ b/internal/impl/pure/processor_exit.go @@ -14,7 +14,7 @@ func init() { Categories("Utility"). Beta(). Summary(`Exit the process with a code.`). - Field(service.NewIntField("").Description("The exit code to use.").Default(0)) + Field(service.NewIntField("").Default(0)) service.MustRegisterProcessor( "exit", spec, func(conf *service.ParsedConfig, res *service.Resources) (service.Processor, error) {