-
Notifications
You must be signed in to change notification settings - Fork 1.7k
proxy: go: add context for input methods #10193
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
@mp3monster, jfyi (as the author of the issue) |
7505318
to
27a727b
Compare
Нello, @edsiper, @leonardo-albertovich! Could you please tell if there is an estimated start date of review of this PR? This PR would be helpful for others, because before these changes, the context was only available in the output plugin. This significantly limited the capabilities of input plugins in GoLang, since working in multiinstance mode was impossible. |
27a727b
to
aa810b0
Compare
aa810b0
to
eba088c
Compare
`flb_plugin_proxy_context` was replaced by `flb_plugin_input_proxy_context` Signed-off-by: milkrage <hello@milkrage.ru>
This is necessary so that go input plugin can implement methods with context in the same way as it is done in go output plugin. Signed-off-by: milkrage <hello@milkrage.ru>
The way to add context to input methods is completely identical to the approach used for output methods. Without this, the callback can only perform hardwired tasks, and the configuration values can not be accessed. As a result, the input becomes extremely constrained on what it can do. Signed-off-by: milkrage <hello@milkrage.ru>
eba088c
to
3ed8c97
Compare
I can't reproduce the error (
@edsiper, could you please rerun the matrix unit tests? |
This is a working example of the input plugin with the changes from this PR: fluent-bit-cloudwatch-input-plugin |
Just to clarify the intention of this PR:
a. FLBPluginInputCallbackCtx - A collect function that receives context
This looks fine to me @milkrage as you are keeping backwards compatibility -- Would you happen to also check the input plugin interface? https://github.com/chronosphereio/calyptia-plugin/tree/main |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems good.
I was worried about necessities of context requirements for collect and cleanup callbacks.
Now I realized that they are optional. So, it looks good to me.
@niedbalski, yes, of course. I'll check this evening. |
fluent-bit config: service:
log_level: debug
pipeline:
inputs:
- name: gdummy
outputs:
- name: stdout
match: '*' run in_gdummy from calyptia-plugin/examples: ➜ ~/Projects/calyptia-plugin/examples/in_gdummy go install
➜ ~/Projects/calyptia-plugin/examples/in_gdummy make all
go build -buildmode=c-shared -o in_gdummy.so .
# check: git commit from my branch
➜ ~/Projects/calyptia-plugin/examples/in_gdummy /usr/local/bin/fluent-bit --version
Fluent Bit v4.0.2
Git commit: 3ed8c97b2142d4c319fd02786905ceb019d379e4
# run fluent-bit with gdummy plugin
➜ ~/Projects/calyptia-plugin/examples/in_gdummy /usr/local/bin/fluent-bit -e in_gdummy.so -c config.yaml
Fluent Bit v4.0.2
* Copyright (C) 2015-2025 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io
______ _ _ ______ _ _ ___ _____
| ___| | | | | ___ (_) | / || _ |
| |_ | |_ _ ___ _ __ | |_ | |_/ /_| |_ __ __/ /| || |/' |
| _| | | | | |/ _ \ '_ \| __| | ___ \ | __| \ \ / / /_| || /| |
| | | | |_| | __/ | | | |_ | |_/ / | |_ \ V /\___ |\ |_/ /
\_| |_|\__,_|\___|_| |_|\__| \____/|_|\__| \_/ |_(_)___/
[2025/04/30 22:51:13] [ info] Configuration:
[2025/04/30 22:51:13] [ info] flush time | 1.000000 seconds
[2025/04/30 22:51:13] [ info] grace | 5 seconds
[2025/04/30 22:51:13] [ info] daemon | 0
[2025/04/30 22:51:13] [ info] ___________
[2025/04/30 22:51:13] [ info] inputs:
[2025/04/30 22:51:13] [ info] gdummy
[2025/04/30 22:51:13] [ info] ___________
[2025/04/30 22:51:13] [ info] filters:
[2025/04/30 22:51:13] [ info] ___________
[2025/04/30 22:51:13] [ info] outputs:
[2025/04/30 22:51:13] [ info] stdout.0
[2025/04/30 22:51:13] [ info] ___________
[2025/04/30 22:51:13] [ info] collectors:
[2025/04/30 22:51:13] [ info] [fluent bit] version=4.0.2, commit=3ed8c97b21, pid=59827
[2025/04/30 22:51:13] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2025/04/30 22:51:13] [ info] [storage] ver=1.5.3, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2025/04/30 22:51:13] [ info] [simd ] disabled
[2025/04/30 22:51:13] [ info] [cmetrics] version=1.0.0
[2025/04/30 22:51:13] [ info] [ctraces ] version=0.6.4
[2025/04/30 22:51:13] [ info] [input:gdummy:gdummy.0] initializing
[2025/04/30 22:51:13] [ info] [input:gdummy:gdummy.0] storage_strategy='memory' (memory only)
[2025/04/30 22:51:13] [debug] [input:gdummy:gdummy.0] [thread init] initialization OK
[2025/04/30 22:51:13] [ info] [input:gdummy:gdummy.0] thread instance initialized
[2025/04/30 22:51:13] [debug] [gdummy:gdummy.0] created event channels: read=38 write=39
[2025/04/30 22:51:13] [debug] [stdout:stdout.0] created event channels: read=42 write=43
[2025/04/30 22:51:13] [ info] [output:stdout:stdout.0] worker #0 started
[2025/04/30 22:51:13] [ info] [sp] stream processor started
[2025/04/30 22:51:14] [debug] [gdummy] operation succeeded
[2025/04/30 22:51:15] [debug] [gdummy] operation succeeded
[2025/04/30 22:51:16] [debug] [task] created task=0x6000022440b0 id=0 OK
[2025/04/30 22:51:16] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] gdummy.0: [[1746042674.417130000, {}], {"message"=>"dummy"}]
[2025/04/30 22:51:16] [debug] [out flush] cb_destroy coro_id=0
[2025/04/30 22:51:16] [debug] [task] destroy task=0x6000022440b0 (task_id=0)
[2025/04/30 22:51:16] [debug] [gdummy] operation succeeded
[2025/04/30 22:51:17] [debug] [task] created task=0x60000224c000 id=0 OK
[2025/04/30 22:51:17] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] gdummy.0: [[1746042675.416635000, {}], {"message"=>"dummy"}]
[2025/04/30 22:51:17] [debug] [out flush] cb_destroy coro_id=1
[2025/04/30 22:51:17] [debug] [task] destroy task=0x60000224c000 (task_id=0)
[2025/04/30 22:51:17] [debug] [gdummy] operation succeeded
[2025/04/30 22:51:18] [debug] [task] created task=0x600002250000 id=0 OK
[2025/04/30 22:51:18] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] gdummy.0: [[1746042676.417520000, {}], {"message"=>"dummy"}]
[2025/04/30 22:51:18] [debug] [out flush] cb_destroy coro_id=2
[2025/04/30 22:51:18] [debug] [task] destroy task=0x600002250000 (task_id=0)
[2025/04/30 22:51:18] [debug] [gdummy] operation succeeded
[2025/04/30 22:51:19] [debug] [task] created task=0x600002248160 id=0 OK
[2025/04/30 22:51:19] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] gdummy.0: [[1746042677.417811000, {}], {"message"=>"dummy"}]
[2025/04/30 22:51:19] [debug] [out flush] cb_destroy coro_id=3
[2025/04/30 22:51:19] [debug] [task] destroy task=0x600002248160 (task_id=0)
[2025/04/30 22:51:19] [debug] [gdummy] operation succeeded
[2025/04/30 22:51:20] [debug] [task] created task=0x600002248160 id=0 OK
[0] gdummy.0: [[1746042678.416987000, {}], {"message"=>"dummy"}]
[2025/04/30 22:51:20] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2025/04/30 22:51:20] [debug] [out flush] cb_destroy coro_id=4
[2025/04/30 22:51:20] [debug] [task] destroy task=0x600002248160 (task_id=0)
[2025/04/30 22:51:20] [debug] [gdummy] operation succeeded
[2025/04/30 22:51:21] [debug] [task] created task=0x600002254000 id=0 OK
[2025/04/30 22:51:21] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] gdummy.0: [[1746042679.417916000, {}], {"message"=>"dummy"}]
[2025/04/30 22:51:21] [debug] [out flush] cb_destroy coro_id=5
[2025/04/30 22:51:21] [debug] [task] destroy task=0x600002254000 (task_id=0)
[2025/04/30 22:51:21] [debug] [gdummy] operation succeeded
[2025/04/30 22:51:22] [debug] [task] created task=0x600002254000 id=0 OK
[2025/04/30 22:51:22] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] gdummy.0: [[1746042680.418499000, {}], {"message"=>"dummy"}]
[2025/04/30 22:51:22] [debug] [out flush] cb_destroy coro_id=6
[2025/04/30 22:51:22] [debug] [task] destroy task=0x600002254000 (task_id=0)
[2025/04/30 22:51:22] [debug] [gdummy] operation succeeded
[2025/04/30 22:51:23] [debug] [task] created task=0x600002248160 id=0 OK
[2025/04/30 22:51:23] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] gdummy.0: [[1746042681.418806000, {}], {"message"=>"dummy"}]
[2025/04/30 22:51:23] [debug] [out flush] cb_destroy coro_id=7
[2025/04/30 22:51:23] [debug] [task] destroy task=0x600002248160 (task_id=0)
[2025/04/30 22:51:23] [debug] [gdummy] operation succeeded
[2025/04/30 22:51:24] [debug] [task] created task=0x600002254000 id=0 OK
[2025/04/30 22:51:24] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] gdummy.0: [[1746042682.418883000, {}], {"message"=>"dummy"}]
[2025/04/30 22:51:24] [debug] [out flush] cb_destroy coro_id=8
[2025/04/30 22:51:24] [debug] [task] destroy task=0x600002254000 (task_id=0)
[2025/04/30 22:51:24] [debug] [gdummy] operation succeeded
[2025/04/30 22:51:25] [debug] [task] created task=0x600002254000 id=0 OK
[2025/04/30 22:51:25] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] gdummy.0: [[1746042683.418970000, {}], {"message"=>"dummy"}]
[2025/04/30 22:51:25] [debug] [out flush] cb_destroy coro_id=9
[2025/04/30 22:51:25] [debug] [task] destroy task=0x600002254000 (task_id=0)
[2025/04/30 22:51:25] [debug] [gdummy] operation succeeded
[2025/04/30 22:51:26] [debug] [task] created task=0x600002250000 id=0 OK
[0] gdummy.0: [[1746042684.418798000, {}], {"message"=>"dummy"}]
[2025/04/30 22:51:26] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2025/04/30 22:51:26] [debug] [out flush] cb_destroy coro_id=10
[2025/04/30 22:51:26] [debug] [task] destroy task=0x600002250000 (task_id=0)
[2025/04/30 22:51:26] [debug] [gdummy] operation succeeded
^C[2025/04/30 22:51:27] [engine] caught signal (SIGINT)
[2025/04/30 22:51:27] [debug] [input:gdummy:gdummy.0] thread pause instance
[2025/04/30 22:51:27] [ info] [output:stdout:stdout.0] thread worker #0 stopping...
[2025/04/30 22:51:27] [ info] [output:stdout:stdout.0] thread worker #0 stopped
[2025/04/30 22:51:27] [debug] [GO] running exit callback
[2025/04/30 22:51:27] [debug] [input:gdummy:gdummy.0] thread exit instance
2025/04/30 22:51:27 goroutine will be stopping: name="gdummy" Everything works |
@cosmo0920 @niedbalski are there any potential breaking changes ? cc: @pwhelan |
moved to other milestone in the meanwhile |
AFAIK, there's no potential breaking changes. |
These changes resolve the following issue: #8464
Enter
[N/A]
in the box, if an item is not applicable to your change.Testing
Before we can approve your change; please submit the following in a comment:
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
ok-package-test
label to test for all targets (requires maintainer to do).Documentation
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.