From db09528efe5ca03b830d279f3ae4cfc9e52a526b Mon Sep 17 00:00:00 2001 From: milkrage Date: Wed, 9 Apr 2025 21:48:02 +0300 Subject: [PATCH 1/3] input: change instance context type for plugin proxy `flb_plugin_proxy_context` was replaced by `flb_plugin_input_proxy_context` Signed-off-by: milkrage --- src/flb_input.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/flb_input.c b/src/flb_input.c index ea0c932272c..2ed7e04a35c 100644 --- a/src/flb_input.c +++ b/src/flb_input.c @@ -284,9 +284,9 @@ struct flb_input_instance *flb_input_new(struct flb_config *config, instance->context = NULL; } else { - struct flb_plugin_proxy_context *ctx; + struct flb_plugin_input_proxy_context *ctx; - ctx = flb_calloc(1, sizeof(struct flb_plugin_proxy_context)); + ctx = flb_calloc(1, sizeof(struct flb_plugin_input_proxy_context)); if (!ctx) { flb_errno(); flb_free(instance); From 4713f44e59369d8cba637e20ebf0e26c1e6ea7d6 Mon Sep 17 00:00:00 2001 From: milkrage Date: Wed, 9 Apr 2025 21:58:59 +0300 Subject: [PATCH 2/3] proxy: send proxy context to go input plugin This is necessary so that go input plugin can implement methods with context in the same way as it is done in go output plugin. Signed-off-by: milkrage --- include/fluent-bit/flb_plugin_proxy.h | 2 ++ src/flb_plugin_proxy.c | 27 +++++++++------------------ 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/include/fluent-bit/flb_plugin_proxy.h b/include/fluent-bit/flb_plugin_proxy.h index 97ceb7ff5d2..233622c8238 100644 --- a/include/fluent-bit/flb_plugin_proxy.h +++ b/include/fluent-bit/flb_plugin_proxy.h @@ -64,6 +64,8 @@ struct flb_plugin_proxy_context { struct flb_plugin_input_proxy_context { int coll_fd; + /* This context is set by the remote init and is passed to remote collect */ + void *remote_context; /* A proxy ptr is needed to store the proxy type/lang (OUTPUT/GOLANG) */ struct flb_plugin_proxy *proxy; }; diff --git a/src/flb_plugin_proxy.c b/src/flb_plugin_proxy.c index c8f318afbbd..a10d9afa92b 100644 --- a/src/flb_plugin_proxy.c +++ b/src/flb_plugin_proxy.c @@ -81,7 +81,7 @@ static int flb_proxy_input_cb_collect(struct flb_input_instance *ins, #ifdef FLB_HAVE_PROXY_GO if (ctx->proxy->def->proxy == FLB_PROXY_GOLANG) { flb_trace("[GO] entering go_collect()"); - ret = proxy_go_input_collect(ctx->proxy, &data, &len); + ret = proxy_go_input_collect(ctx, &data, &len); if (len == 0) { flb_trace("[GO] No logs are ingested"); @@ -95,7 +95,7 @@ static int flb_proxy_input_cb_collect(struct flb_input_instance *ins, flb_input_log_append(ins, NULL, 0, data, len); - ret = proxy_go_input_cleanup(ctx->proxy, data); + ret = proxy_go_input_cleanup(ctx, data); if (ret == -1) { flb_errno(); return -1; @@ -110,19 +110,10 @@ static int flb_proxy_input_cb_init(struct flb_input_instance *ins, struct flb_config *config, void *data) { int ret = -1; - struct flb_plugin_input_proxy_context *ctx; - struct flb_plugin_proxy_context *pc; - - /* Allocate space for the configuration context */ - ctx = flb_malloc(sizeof(struct flb_plugin_input_proxy_context)); - if (!ctx) { - flb_errno(); - return -1; - } + struct flb_plugin_input_proxy_context *pc; /* Before to initialize for proxy, set the proxy instance reference */ - pc = (struct flb_plugin_proxy_context *)(ins->context); - ctx->proxy = pc->proxy; + pc = (struct flb_plugin_input_proxy_context *)(ins->context); /* Before to initialize, set the instance reference */ pc->proxy->instance = ins; @@ -147,7 +138,7 @@ static int flb_proxy_input_cb_init(struct flb_input_instance *ins, } /* Set the context */ - flb_input_set_context(ins, ctx); + flb_input_set_context(ins, pc); /* Collect upon data available on timer */ ret = flb_input_set_collector_time(ins, @@ -159,12 +150,12 @@ static int flb_proxy_input_cb_init(struct flb_input_instance *ins, flb_error("Could not set collector for threaded proxy input plugin"); goto init_error; } - ctx->coll_fd = ret; + pc->coll_fd = ret; return ret; init_error: - flb_free(ctx); + flb_free(pc); return -1; } @@ -311,10 +302,10 @@ static int flb_proxy_input_cb_pre_run(struct flb_input_instance *ins, struct flb_config *config, void *data) { int ret = -1; - struct flb_plugin_proxy_context *pc; + struct flb_plugin_input_proxy_context *pc; struct flb_plugin_proxy *proxy; - pc = (struct flb_plugin_proxy_context *)(ins->context); + pc = (struct flb_plugin_input_proxy_context *)(ins->context); proxy = pc->proxy; /* pre_run */ From 3ed8c97b2142d4c319fd02786905ceb019d379e4 Mon Sep 17 00:00:00 2001 From: milkrage Date: Wed, 9 Apr 2025 22:20:53 +0300 Subject: [PATCH 3/3] proxy: go: add context for input methods The way to add context to input methods is completely identical to the approach used for output methods. Without this, the callback can only perform hardwired tasks, and the configuration values can not be accessed. As a result, the input becomes extremely constrained on what it can do. Signed-off-by: milkrage --- src/proxy/go/go.c | 23 +++++++++++++++++------ src/proxy/go/go.h | 8 +++++--- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/src/proxy/go/go.c b/src/proxy/go/go.c index 540005a8d9d..e28151b0d81 100644 --- a/src/proxy/go/go.c +++ b/src/proxy/go/go.c @@ -184,6 +184,7 @@ int proxy_go_input_register(struct flb_plugin_proxy *proxy, * * - FLBPluginInit * - FLBPluginInputCallback + * - FLBPluginInputCallbackCtx * - FLBPluginExit * * note: registration callback FLBPluginRegister() is resolved by the @@ -198,7 +199,9 @@ int proxy_go_input_register(struct flb_plugin_proxy *proxy, } plugin->cb_collect = flb_plugin_proxy_symbol(proxy, "FLBPluginInputCallback"); + plugin->cb_collect_ctx = flb_plugin_proxy_symbol(proxy, "FLBPluginInputCallbackCtx"); plugin->cb_cleanup = flb_plugin_proxy_symbol(proxy, "FLBPluginInputCleanupCallback"); + plugin->cb_cleanup_ctx = flb_plugin_proxy_symbol(proxy, "FLBPluginInputCleanupCallbackCtx"); plugin->cb_exit = flb_plugin_proxy_symbol(proxy, "FLBPluginExit"); plugin->name = flb_strdup(def->name); @@ -231,27 +234,35 @@ int proxy_go_input_init(struct flb_plugin_proxy *proxy) return ret; } -int proxy_go_input_collect(struct flb_plugin_proxy *ctx, +int proxy_go_input_collect(struct flb_plugin_input_proxy_context *ctx, void **collected_data, size_t *len) { int ret; void *data = NULL; - struct flbgo_input_plugin *plugin = ctx->data; + struct flbgo_input_plugin *plugin = ctx->proxy->data; - ret = plugin->cb_collect(&data, len); + if (plugin->cb_collect_ctx) { + ret = plugin->cb_collect_ctx(ctx->remote_context, &data, len); + } + else { + ret = plugin->cb_collect(&data, len); + } *collected_data = data; return ret; } -int proxy_go_input_cleanup(struct flb_plugin_proxy *ctx, +int proxy_go_input_cleanup(struct flb_plugin_input_proxy_context *ctx, void *allocated_data) { int ret = 0; - struct flbgo_input_plugin *plugin = ctx->data; + struct flbgo_input_plugin *plugin = ctx->proxy->data; - if (plugin->cb_cleanup) { + if (plugin->cb_cleanup_ctx) { + ret = plugin->cb_cleanup_ctx(ctx->remote_context, allocated_data); + } + else if (plugin->cb_cleanup) { ret = plugin->cb_cleanup(allocated_data); } else { diff --git a/src/proxy/go/go.h b/src/proxy/go/go.h index eed328e24bf..1c66b5f708b 100644 --- a/src/proxy/go/go.h +++ b/src/proxy/go/go.h @@ -40,11 +40,13 @@ struct flbgo_input_plugin { char *name; void *api; void *i_ins; - struct flb_plugin_proxy_context *context; + struct flb_plugin_input_proxy_context *context; int (*cb_init)(); int (*cb_collect)(void **, size_t *); + int (*cb_collect_ctx)(void *, void **, size_t *); int (*cb_cleanup)(void *); + int (*cb_cleanup_ctx)(void *, void *); int (*cb_exit)(); }; @@ -63,9 +65,9 @@ int proxy_go_input_register(struct flb_plugin_proxy *proxy, struct flb_plugin_proxy_def *def); int proxy_go_input_init(struct flb_plugin_proxy *proxy); -int proxy_go_input_collect(struct flb_plugin_proxy *ctx, +int proxy_go_input_collect(struct flb_plugin_input_proxy_context *ctx, void **collected_data, size_t *len); -int proxy_go_input_cleanup(struct flb_plugin_proxy *ctx, +int proxy_go_input_cleanup(struct flb_plugin_input_proxy_context *ctx, void *allocated_data); int proxy_go_input_destroy(struct flb_plugin_input_proxy_context *ctx); void proxy_go_input_unregister(void *data);