Skip to content

proxy: go: add context for input methods #10193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/fluent-bit/flb_plugin_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ struct flb_plugin_proxy_context {

struct flb_plugin_input_proxy_context {
int coll_fd;
/* This context is set by the remote init and is passed to remote collect */
void *remote_context;
/* A proxy ptr is needed to store the proxy type/lang (OUTPUT/GOLANG) */
struct flb_plugin_proxy *proxy;
};
Expand Down
4 changes: 2 additions & 2 deletions src/flb_input.c
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,9 @@ struct flb_input_instance *flb_input_new(struct flb_config *config,
instance->context = NULL;
}
else {
struct flb_plugin_proxy_context *ctx;
struct flb_plugin_input_proxy_context *ctx;

ctx = flb_calloc(1, sizeof(struct flb_plugin_proxy_context));
ctx = flb_calloc(1, sizeof(struct flb_plugin_input_proxy_context));
if (!ctx) {
flb_errno();
flb_free(instance);
Expand Down
27 changes: 9 additions & 18 deletions src/flb_plugin_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ static int flb_proxy_input_cb_collect(struct flb_input_instance *ins,
#ifdef FLB_HAVE_PROXY_GO
if (ctx->proxy->def->proxy == FLB_PROXY_GOLANG) {
flb_trace("[GO] entering go_collect()");
ret = proxy_go_input_collect(ctx->proxy, &data, &len);
ret = proxy_go_input_collect(ctx, &data, &len);

if (len == 0) {
flb_trace("[GO] No logs are ingested");
Expand All @@ -95,7 +95,7 @@ static int flb_proxy_input_cb_collect(struct flb_input_instance *ins,

flb_input_log_append(ins, NULL, 0, data, len);

ret = proxy_go_input_cleanup(ctx->proxy, data);
ret = proxy_go_input_cleanup(ctx, data);
if (ret == -1) {
flb_errno();
return -1;
Expand All @@ -110,19 +110,10 @@ static int flb_proxy_input_cb_init(struct flb_input_instance *ins,
struct flb_config *config, void *data)
{
int ret = -1;
struct flb_plugin_input_proxy_context *ctx;
struct flb_plugin_proxy_context *pc;

/* Allocate space for the configuration context */
ctx = flb_malloc(sizeof(struct flb_plugin_input_proxy_context));
if (!ctx) {
flb_errno();
return -1;
}
struct flb_plugin_input_proxy_context *pc;

/* Before to initialize for proxy, set the proxy instance reference */
pc = (struct flb_plugin_proxy_context *)(ins->context);
ctx->proxy = pc->proxy;
pc = (struct flb_plugin_input_proxy_context *)(ins->context);

/* Before to initialize, set the instance reference */
pc->proxy->instance = ins;
Expand All @@ -147,7 +138,7 @@ static int flb_proxy_input_cb_init(struct flb_input_instance *ins,
}

/* Set the context */
flb_input_set_context(ins, ctx);
flb_input_set_context(ins, pc);

/* Collect upon data available on timer */
ret = flb_input_set_collector_time(ins,
Expand All @@ -159,12 +150,12 @@ static int flb_proxy_input_cb_init(struct flb_input_instance *ins,
flb_error("Could not set collector for threaded proxy input plugin");
goto init_error;
}
ctx->coll_fd = ret;
pc->coll_fd = ret;

return ret;

init_error:
flb_free(ctx);
flb_free(pc);

return -1;
}
Expand Down Expand Up @@ -311,10 +302,10 @@ static int flb_proxy_input_cb_pre_run(struct flb_input_instance *ins,
struct flb_config *config, void *data)
{
int ret = -1;
struct flb_plugin_proxy_context *pc;
struct flb_plugin_input_proxy_context *pc;
struct flb_plugin_proxy *proxy;

pc = (struct flb_plugin_proxy_context *)(ins->context);
pc = (struct flb_plugin_input_proxy_context *)(ins->context);
proxy = pc->proxy;

/* pre_run */
Expand Down
23 changes: 17 additions & 6 deletions src/proxy/go/go.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ int proxy_go_input_register(struct flb_plugin_proxy *proxy,
*
* - FLBPluginInit
* - FLBPluginInputCallback
* - FLBPluginInputCallbackCtx
* - FLBPluginExit
*
* note: registration callback FLBPluginRegister() is resolved by the
Expand All @@ -198,7 +199,9 @@ int proxy_go_input_register(struct flb_plugin_proxy *proxy,
}

plugin->cb_collect = flb_plugin_proxy_symbol(proxy, "FLBPluginInputCallback");
plugin->cb_collect_ctx = flb_plugin_proxy_symbol(proxy, "FLBPluginInputCallbackCtx");
plugin->cb_cleanup = flb_plugin_proxy_symbol(proxy, "FLBPluginInputCleanupCallback");
plugin->cb_cleanup_ctx = flb_plugin_proxy_symbol(proxy, "FLBPluginInputCleanupCallbackCtx");
plugin->cb_exit = flb_plugin_proxy_symbol(proxy, "FLBPluginExit");
plugin->name = flb_strdup(def->name);

Expand Down Expand Up @@ -231,27 +234,35 @@ int proxy_go_input_init(struct flb_plugin_proxy *proxy)
return ret;
}

int proxy_go_input_collect(struct flb_plugin_proxy *ctx,
int proxy_go_input_collect(struct flb_plugin_input_proxy_context *ctx,
void **collected_data, size_t *len)
{
int ret;
void *data = NULL;
struct flbgo_input_plugin *plugin = ctx->data;
struct flbgo_input_plugin *plugin = ctx->proxy->data;

ret = plugin->cb_collect(&data, len);
if (plugin->cb_collect_ctx) {
ret = plugin->cb_collect_ctx(ctx->remote_context, &data, len);
}
else {
ret = plugin->cb_collect(&data, len);
}

*collected_data = data;

return ret;
}

int proxy_go_input_cleanup(struct flb_plugin_proxy *ctx,
int proxy_go_input_cleanup(struct flb_plugin_input_proxy_context *ctx,
void *allocated_data)
{
int ret = 0;
struct flbgo_input_plugin *plugin = ctx->data;
struct flbgo_input_plugin *plugin = ctx->proxy->data;

if (plugin->cb_cleanup) {
if (plugin->cb_cleanup_ctx) {
ret = plugin->cb_cleanup_ctx(ctx->remote_context, allocated_data);
}
else if (plugin->cb_cleanup) {
ret = plugin->cb_cleanup(allocated_data);
}
else {
Expand Down
8 changes: 5 additions & 3 deletions src/proxy/go/go.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ struct flbgo_input_plugin {
char *name;
void *api;
void *i_ins;
struct flb_plugin_proxy_context *context;
struct flb_plugin_input_proxy_context *context;

int (*cb_init)();
int (*cb_collect)(void **, size_t *);
int (*cb_collect_ctx)(void *, void **, size_t *);
int (*cb_cleanup)(void *);
int (*cb_cleanup_ctx)(void *, void *);
int (*cb_exit)();
};

Expand All @@ -63,9 +65,9 @@ int proxy_go_input_register(struct flb_plugin_proxy *proxy,
struct flb_plugin_proxy_def *def);

int proxy_go_input_init(struct flb_plugin_proxy *proxy);
int proxy_go_input_collect(struct flb_plugin_proxy *ctx,
int proxy_go_input_collect(struct flb_plugin_input_proxy_context *ctx,
void **collected_data, size_t *len);
int proxy_go_input_cleanup(struct flb_plugin_proxy *ctx,
int proxy_go_input_cleanup(struct flb_plugin_input_proxy_context *ctx,
void *allocated_data);
int proxy_go_input_destroy(struct flb_plugin_input_proxy_context *ctx);
void proxy_go_input_unregister(void *data);
Expand Down
Loading