Skip to content

Commit b3fe505

Browse files
mihaitodorJeffail
andauthored
Add exit_code metadata to the command processor (#349)
Inspired from: redpanda-data/connect#3766 Signed-off-by: Mihai Todor <todormihai@gmail.com> Co-authored-by: Ashley Jeffs <ash@jeffail.uk>
1 parent 6255478 commit b3fe505

File tree

3 files changed

+25
-3
lines changed

3 files changed

+25
-3
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ All notable changes to this project will be documented in this file.
77

88
### Added
99

10+
- The `command` processor now emits the `exit_code` metadata field. (@mihaitodor)
1011
- Go API: `Message` now exposes `MetaGetImmut` and `MetaSetImmut` methods for reading and writing metadata values of any type with immutable semantics. (@Jeffail)
1112

1213
### Fixed

internal/impl/io/processor_command.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,25 @@ func commandProcSpec() *service.ConfigSpec {
2626
Description(`
2727
The specified command is executed for each message processed, with the raw bytes of the message being fed into the stdin of the command process, and the resulting message having its contents replaced with the stdout of it.
2828
29+
== Metadata
30+
31+
This input adds the following metadata fields to each message:
32+
33+
`+"```text"+`
34+
- command_stderr - Contains the stderr output of a successful command, if any.
35+
- exit_code - The exit code returned by the command.
36+
`+"```"+`
37+
38+
You can access these metadata fields using
39+
xref:configuration:interpolation.adoc#bloblang-queries[function interpolation].
40+
2941
== Performance
3042
3143
Since this processor executes a new process for each message performance will likely be an issue for high throughput streams. If this is the case then consider using the xref:components:processors/subprocess.adoc[`+"`subprocess` processor"+`] instead as it keeps the underlying process alive long term and uses codecs to insert and extract inputs and outputs to it via stdin/stdout.
3244
3345
== Error handling
3446
3547
If a non-zero error code is returned by the command then an error containing the entirety of stderr (or a generic message if nothing is written) is set on the message. These failed messages will continue through the pipeline unchanged, but can be dropped or placed in a dead letter queue according to your config, you can read about xref:configuration:error_handling.adoc[these patterns].
36-
37-
If the command is successful but stderr is written to then a metadata field `+"`command_stderr`"+` is populated with its contents.
3848
`).
3949
Fields(
4050
service.NewInterpolatedStringField(cpNameField).
@@ -148,13 +158,16 @@ func (c *commandProc) Process(ctx context.Context, msg *service.Message) (servic
148158
err = cmd.Run()
149159
outBytes, errBytes := stdout.Bytes(), stderr.Bytes()
150160
if err != nil {
151-
return nil, fmt.Errorf("execution error: %w: %s", err, errBytes)
161+
msg.MetaSetMut("exit_code", cmd.ProcessState.ExitCode())
162+
return service.MessageBatch{msg}, fmt.Errorf("execution error: %w: %s", err, errBytes)
152163
}
153164

154165
msg.SetBytes(outBytes)
155166
if len(errBytes) > 0 {
156167
msg.MetaSet("command_stderr", string(errBytes))
157168
}
169+
msg.MetaSetMut("exit_code", 0)
170+
158171
return service.MessageBatch{msg}, nil
159172
}
160173

internal/impl/io/processor_command_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,21 @@ args_mapping: '[ "-n" ]'
8787
require.NoError(t, err)
8888

8989
res, err := cmdProc.Process(tCtx, service.NewMessage([]byte(test.input)))
90+
91+
exitCode, ok := res[0].MetaGetMut("exit_code")
92+
assert.True(t, ok)
93+
9094
if test.errContains != "" {
9195
require.Error(t, err)
9296
assert.Contains(t, err.Error(), test.errContains)
97+
98+
assert.Equal(t, 2, exitCode)
9399
} else {
94100
require.NoError(t, err)
95101
require.Len(t, res, 1)
96102

103+
assert.Equal(t, 0, exitCode)
104+
97105
resBytes, err := res[0].AsBytes()
98106
require.NoError(t, err)
99107
assert.Contains(t, string(resBytes), test.outputContains)

0 commit comments

Comments
 (0)