Skip to content

engine: context event loop. #10244

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
12 changes: 12 additions & 0 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,22 @@
* pointers.
*/

#define FLB_CONTEXT_EV_SIGNAL (1 << 0) /* 1 */

enum ctx_signal_type {
FLB_CTX_SIGNAL_RELOAD,
FLB_CTX_SIGNAL_SHUTDOWN,
};

/* Main struct to hold the configuration of the runtime service */
struct flb_config {
struct mk_event ch_event;

/* external communication channel for fluent-bit contexts */
struct mk_event_loop *ctx_evl;
flb_pipefd_t ch_context_signal[2]; /* channel to recieve context signal events */
struct mk_event event_context_signal;

int support_mode; /* enterprise support mode ? */
int is_ingestion_active; /* date ingestion active/allowed */
int is_shutting_down; /* is the service shutting down ? */
Expand Down
17 changes: 12 additions & 5 deletions plugins/in_calyptia_fleet/in_calyptia_fleet.c
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,8 @@ static int exists_header_fleet_config(struct flb_in_calyptia_fleet_config *ctx)
static void *do_reload(void *data)
{
struct reload_ctx *reload = (struct reload_ctx *)data;
enum ctx_signal_type ctx_signal;
int ret;

if (reload == NULL) {
return NULL;
Expand All @@ -470,11 +472,16 @@ static void *do_reload(void *data)

flb_free(reload);
sleep(5);
#ifndef FLB_SYSTEM_WINDOWS
kill(getpid(), SIGHUP);
#else
GenerateConsoleCtrlEvent(1 /* CTRL_BREAK_EVENT_1 */, 0);
#endif

ctx_signal = FLB_CTX_SIGNAL_RELOAD;
ret = flb_pipe_w(reload->flb->config->ch_context_signal[1],
&ctx_signal,
sizeof(enum ctx_signal_type));
if (ret != 0) {
flb_error("unable to signal reload");
return NULL;
}

return NULL;
}

Expand Down
39 changes: 39 additions & 0 deletions src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,7 @@ int flb_engine_start(struct flb_config *config)
struct flb_sched *sched;
struct flb_net_dns dns_ctx;
struct flb_notification *notification;
enum ctx_signal_type ctx_signal;

/* Initialize the networking layer */
flb_net_lib_init();
Expand All @@ -719,6 +720,29 @@ int flb_engine_start(struct flb_config *config)
}
config->evl_bktq = evl_bktq;

/*
* Event loop channel to send context signals.
*
*/
/* Create the event loop and set it in the global configuration */
config->ctx_evl = mk_event_loop_create(8);
if (!config->ctx_evl) {
fprintf(stderr, "[log] could not create context event loop\n");
return -1;
}

ret = mk_event_channel_create(config->ctx_evl,
&config->ch_context_signal[0],
&config->ch_context_signal[1],
&config->event_context_signal);
if (ret == -1) {
flb_error("[engine] could not create context signal channel");
return -1;
}
/* Signal type to indicate a "context" event */
config->event_context_signal.type = FLB_CONTEXT_EV_SIGNAL;
config->event_context_signal.priority = FLB_ENGINE_PRIORITY_THREAD;

/*
* Event loop channel to ingest flush events from flb_engine_flush()
*
Expand Down Expand Up @@ -1047,8 +1071,14 @@ int flb_engine_start(struct flb_config *config)
flb_info("[engine] service has stopped (%i pending tasks)",
ret);
ret = config->exit_status_code;

ctx_signal = FLB_CTX_SIGNAL_SHUTDOWN;
flb_pipe_w(config->ch_context_signal[1], &ctx_signal,
sizeof(enum ctx_signal_type));

flb_engine_shutdown(config);
config = NULL;

return ret;
}
}
Expand Down Expand Up @@ -1185,6 +1215,15 @@ int flb_engine_shutdown(struct flb_config *config)
flb_hs_destroy(config->http_ctx);
}
#endif
if (config->ctx_evl) {
mk_event_channel_destroy(config->ctx_evl,
config->ch_context_signal[0],
config->ch_context_signal[1],
&config->event_context_signal);
mk_event_loop_destroy(config->ctx_evl);
config->ctx_evl = NULL;
}

if (config->evl) {
mk_event_channel_destroy(config->evl,
config->ch_self_events[0],
Expand Down
109 changes: 59 additions & 50 deletions src/fluent-bit.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ extern void win32_started(void);

flb_ctx_t *ctx;
struct flb_config *config;
int ch_context_signal;
volatile sig_atomic_t exit_signal = 0;
volatile sig_atomic_t flb_bin_restarting = FLB_RELOAD_IDLE;

Expand Down Expand Up @@ -541,7 +542,11 @@ static void flb_help_plugin(int rc, int format,

static void flb_signal_handler_break_loop(int signal)
{
exit_signal = signal;
enum ctx_signal_type ctx_signal;

ctx_signal = FLB_CTX_SIGNAL_SHUTDOWN;
flb_pipe_w(ch_context_signal, &ctx_signal,
sizeof(enum ctx_signal_type));
}

static void flb_signal_exit(int signal)
Expand Down Expand Up @@ -603,6 +608,8 @@ static void flb_signal_handler_status_line(struct flb_cf *cf_opts)
static void flb_signal_handler(int signal)
{
struct flb_cf *cf_opts = flb_cf_context_get();
enum ctx_signal_type ctx_signal;

flb_signal_handler_status_line(cf_opts);

switch (signal) {
Expand Down Expand Up @@ -633,12 +640,10 @@ static void flb_signal_handler(int signal)
break;
#ifndef FLB_HAVE_STATIC_CONF
case SIGHUP:
if (flb_bin_restarting == FLB_RELOAD_IDLE) {
flb_bin_restarting = FLB_RELOAD_IN_PROGRESS;
}
else {
flb_utils_error(FLB_ERR_RELOADING_IN_PROGRESS);
}
ctx_signal = FLB_CTX_SIGNAL_RELOAD;
flb_pipe_w(ch_context_signal, &ctx_signal,
sizeof(enum ctx_signal_type));

break;
#endif
#endif
Expand All @@ -661,6 +666,7 @@ void flb_console_handler_set_ctx(flb_ctx_t *ctx, struct flb_cf *cf_opts)
static BOOL WINAPI flb_console_handler(DWORD evType)
{
struct flb_cf *cf_opts;
enum ctx_signal_type ctx_signal;

switch(evType) {
case 0 /* CTRL_C_EVENT_0 */:
Expand All @@ -674,17 +680,9 @@ static BOOL WINAPI flb_console_handler(DWORD evType)
handler_signal = 2;
break;
case 1 /* CTRL_BREAK_EVENT_1 */:
if (flb_bin_restarting == FLB_RELOAD_IDLE) {
flb_bin_restarting = FLB_RELOAD_IN_PROGRESS;
/* signal the main loop to execute reload. this is necessary since
* all signal handlers in win32 are executed on their own thread.
*/
handler_signal = 1;
flb_bin_restarting = FLB_RELOAD_IDLE;
}
else {
flb_utils_error(FLB_ERR_RELOADING_IN_PROGRESS);
}
ctx_signal = FLB_CTX_SIGNAL_SHUTDOWN;
flb_pipe_w(ch_context_signal, &ctx_signal,
sizeof(enum ctx_signal_type));
break;
}
return 1;
Expand Down Expand Up @@ -1003,6 +1001,10 @@ int flb_main(int argc, char **argv)
struct flb_cf *cf_opts;
struct flb_cf_group *group;

struct mk_event *event;
enum ctx_signal_type ctx_signal;
int is_shutdown = 0;

prog_name = argv[0];

cf_opts = flb_cf_create();
Expand Down Expand Up @@ -1410,6 +1412,8 @@ int flb_main(int argc, char **argv)
return ret;
}

ch_context_signal = config->ch_context_signal[1];

/* Store the current config format context from command line */
flb_cf_context_set(cf_opts);

Expand All @@ -1425,42 +1429,47 @@ int flb_main(int argc, char **argv)
}
#endif

while (ctx->status == FLB_LIB_OK && exit_signal == 0) {
sleep(1);

#ifdef FLB_SYSTEM_WINDOWS
if (handler_signal == 1) {
handler_signal = 0;
flb_reload(ctx, cf_opts);
}
else if (handler_signal == 2){
handler_signal = 0;
break;
}
#endif

/* set the context again before checking the status again */
ctx = flb_context_get();
while (exit_signal == 0 && is_shutdown == 0) {
mk_event_wait(config->ctx_evl);
mk_event_foreach(event, config->ctx_evl) {
if (exit_signal) {
break;
}
switch (event->type) {
case FLB_CONTEXT_EV_SIGNAL:
ret = flb_pipe_r(event->fd, &ctx_signal,
sizeof(enum ctx_signal_type));
if (ret <= 0) {
flb_error("unable to read context eventt");
continue;
}

#ifdef FLB_SYSTEM_WINDOWS
flb_console_handler_set_ctx(ctx, cf_opts);
switch(ctx_signal) {
case FLB_CTX_SIGNAL_SHUTDOWN:
is_shutdown = 1;
break;
case FLB_CTX_SIGNAL_RELOAD:
/* reload by using same config files/path */
flb_bin_restarting == FLB_RELOAD_IN_PROGRESS;
ret = flb_reload(ctx, cf_opts);
if (ret == 0) {
ctx = flb_context_get();
flb_bin_restarting = FLB_RELOAD_IDLE;
config = ctx->config;
ch_context_signal = config->ch_context_signal[1];
#ifdef FLB_HAVE_CHUNK_TRACE
if (trace_input != NULL) {
enable_trace_input(ctx, trace_input, NULL /* prefix ... */, trace_output, trace_props);
}
#endif
if (flb_bin_restarting == FLB_RELOAD_IN_PROGRESS) {
/* reload by using same config files/path */
ret = flb_reload(ctx, cf_opts);
if (ret == 0) {
ctx = flb_context_get();
flb_bin_restarting = FLB_RELOAD_IDLE;
}
else {
flb_bin_restarting = ret;
}
else {
flb_bin_restarting = ret;
}
break;
}
}
}

if (flb_bin_restarting == FLB_RELOAD_HALTED) {
sleep(1);
flb_bin_restarting = FLB_RELOAD_IDLE;
}
}

if (exit_signal) {
Expand Down
38 changes: 4 additions & 34 deletions src/http_server/api/v2/reload.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ static void handle_reload_request(mk_request_t *request, struct flb_config *conf
msgpack_packer mp_pck;
msgpack_sbuffer mp_sbuf;
int http_status = 200;
enum ctx_signal_type ctx_signal;

/* initialize buffers */
msgpack_sbuffer_init(&mp_sbuf);
Expand All @@ -50,7 +51,6 @@ static void handle_reload_request(mk_request_t *request, struct flb_config *conf
msgpack_pack_str(&mp_pck, 6);
msgpack_pack_str_body(&mp_pck, "reload", 6);

#ifdef FLB_SYSTEM_WINDOWS
if (config->enable_hot_reload != FLB_TRUE) {
msgpack_pack_str(&mp_pck, 11);
msgpack_pack_str_body(&mp_pck, "not enabled", 11);
Expand All @@ -67,37 +67,9 @@ static void handle_reload_request(mk_request_t *request, struct flb_config *conf
http_status = 400;
}
else {
ret = GenerateConsoleCtrlEvent(1 /* CTRL_BREAK_EVENT_1 */, 0);
if (ret == 0) {
mk_http_status(request, 500);
mk_http_done(request);
return;
}

msgpack_pack_str(&mp_pck, 4);
msgpack_pack_str_body(&mp_pck, "done", 4);
msgpack_pack_str(&mp_pck, 6);
msgpack_pack_str_body(&mp_pck, "status", 6);
msgpack_pack_int64(&mp_pck, ret);
}
#else
if (config->enable_hot_reload != FLB_TRUE) {
msgpack_pack_str(&mp_pck, 11);
msgpack_pack_str_body(&mp_pck, "not enabled", 11);
msgpack_pack_str(&mp_pck, 6);
msgpack_pack_str_body(&mp_pck, "status", 6);
msgpack_pack_int64(&mp_pck, -1);
}
else if (config->hot_reloading == FLB_TRUE) {
msgpack_pack_str(&mp_pck, 11);
msgpack_pack_str_body(&mp_pck, "in progress", 11);
msgpack_pack_str(&mp_pck, 6);
msgpack_pack_str_body(&mp_pck, "status", 6);
msgpack_pack_int64(&mp_pck, -2);
http_status = 400;
}
else {
ret = kill(getpid(), SIGHUP);
ctx_signal = FLB_CTX_SIGNAL_RELOAD;
ret = flb_pipe_w(config->ch_context_signal[1], &ctx_signal,
sizeof(enum ctx_signal_type));
if (ret != 0) {
mk_http_status(request, 500);
mk_http_done(request);
Expand All @@ -111,8 +83,6 @@ static void handle_reload_request(mk_request_t *request, struct flb_config *conf
msgpack_pack_int64(&mp_pck, ret);
}

#endif

/* Export to JSON */
out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
msgpack_sbuffer_destroy(&mp_sbuf);
Expand Down
Loading