From fb58a3c77b2102d82a3a772d68a2a88dcacbc0bd Mon Sep 17 00:00:00 2001 From: Oleg Mukhin Date: Sat, 19 Jul 2025 15:27:17 +0100 Subject: [PATCH 1/4] filter_lookup: added filter for key value lookup New filter aims to address use case of simple data enrichment using static key value lookup. The filter loads first two columns of CSV file into memory as a hash table. When a specified record value matches the key in the hash table the value will be appended to the record (based on key name defined in the filter inputs).) Tested with valgrind. Signed-off-by: Oleg Mukhin --- CMakeLists.txt | 1 + plugins/CMakeLists.txt | 1 + plugins/filter_lookup/CMakeLists.txt | 4 + plugins/filter_lookup/lookup.c | 713 +++++++++++++++++++++++++++ plugins/filter_lookup/lookup.h | 38 ++ 5 files changed, 757 insertions(+) create mode 100644 plugins/filter_lookup/CMakeLists.txt create mode 100644 plugins/filter_lookup/lookup.c create mode 100644 plugins/filter_lookup/lookup.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 5d9e7c879c9..4e760e1effb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -196,6 +196,7 @@ option(FLB_WASM_STACK_PROTECT "Build with WASM runtime with strong stack protec option(FLB_ENFORCE_ALIGNMENT "Enable limited platform specific aligned memory access" No) option(FLB_KAFKA "Enable Kafka support" Yes) option(FLB_ZIG "Enable zig integration" Yes) +option(FLB_FILTER_LOOKUP "Enable filter lookup support" Yes) # Native Metrics Support (cmetrics) option(FLB_METRICS "Enable metrics support" Yes) diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index baaaabd0326..2cd9fb8a063 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -417,6 +417,7 @@ REGISTER_FILTER_PLUGIN("filter_alter_size") REGISTER_FILTER_PLUGIN("filter_aws") REGISTER_FILTER_PLUGIN("filter_checklist") REGISTER_FILTER_PLUGIN("filter_ecs") +REGISTER_FILTER_PLUGIN("filter_lookup") REGISTER_FILTER_PLUGIN("filter_record_modifier") REGISTER_FILTER_PLUGIN("filter_sysinfo") REGISTER_FILTER_PLUGIN("filter_throttle") diff --git a/plugins/filter_lookup/CMakeLists.txt b/plugins/filter_lookup/CMakeLists.txt new file mode 100644 index 00000000000..4b3ad679204 --- /dev/null +++ b/plugins/filter_lookup/CMakeLists.txt @@ -0,0 +1,4 @@ +set(src + lookup.c) + +FLB_PLUGIN(filter_lookup "${src}" "") diff --git a/plugins/filter_lookup/lookup.c b/plugins/filter_lookup/lookup.c new file mode 100644 index 00000000000..36f449a86a9 --- /dev/null +++ b/plugins/filter_lookup/lookup.c @@ -0,0 +1,713 @@ +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +struct val_node { + struct mk_list _head; + void *val; +}; + +struct lookup_ctx { + struct flb_filter_instance *ins; + char *file; + char *lookup_key; + char *result_key; + struct flb_hash_table *ht; + struct flb_record_accessor *ra_lookup_key; + int ignore_case; + struct mk_list val_list; +}; + +/* + * Trims leading/trailing whitespace and optionally normalizes to lower-case. + * Allocates output buffer (caller must free if output != input). + */ +static int normalize_and_trim(const char *input, size_t len, int ignore_case, char **output, size_t *out_len) +{ + if (!input || len == 0) { + *output = NULL; + *out_len = 0; + return 0; + } + /* Trim leading whitespace */ + const char *start = input; + size_t n = len; + while (n > 0 && isspace((unsigned char)*start)) { + start++; + n--; + } + /* Trim trailing whitespace */ + const char *end = start + n; + while (n > 0 && isspace((unsigned char)*(end - 1))) { + end--; + n--; + } + if (n == 0) { + *output = NULL; + *out_len = 0; + return 0; + } + if (ignore_case) { + char *buf = flb_malloc(n + 1); + if (!buf) { + *output = NULL; + *out_len = 0; + return -1; + } + for (size_t j = 0; j < n; j++) { + buf[j] = tolower((unsigned char)start[j]); + } + buf[n] = '\0'; + *output = buf; + *out_len = n; + return 1; + } else { + *output = (char *)start; + *out_len = n; + return 0; + } +} + +static int load_csv(struct lookup_ctx *ctx) +{ + FILE *fp; + char line[4096]; + int line_num = 1; + fp = fopen(ctx->file, "r"); + if (!fp) { + flb_plg_error(ctx->ins, "cannot open CSV file: %s", ctx->file); + return -1; + } + /* Initialize value list if not already */ + mk_list_init(&ctx->val_list); + /* Skip header */ + if (!fgets(line, sizeof(line), fp)) { + flb_plg_error(ctx->ins, "empty CSV file: %s", ctx->file); + fclose(fp); + return -1; + } + while (fgets(line, sizeof(line), fp)) { + line[strcspn(line, "\r\n")] = '\0'; + if (strlen(line) == 0) { + line_num++; + continue; + } + + /* Handle quotes in CSV files */ + char *p = line; + char key[2048]; + char val[2048]; + size_t key_len = 0, val_len = 0; + key[0] = '\0'; + val[0] = '\0'; + int in_quotes = 0; + int field = 0; /* 0=key, 1=val */ + + /* Parse key from first column (and handle quotes) */ + while (*p && (field == 0)) { + if (!in_quotes && *p == '"') { + in_quotes = 1; + p++; + continue; + } + if (in_quotes) { + if (*p == '"') { + if (*(p+1) == '"') { + /* Escaped quote */ + if (key_len < sizeof(key)-1) key[key_len++] = '"'; + p += 2; + continue; + } else { + in_quotes = 0; + p++; + continue; + } + } + if (key_len < sizeof(key)-1) key[key_len++] = *p; + p++; + continue; + } + if (*p == ',') { + field = 1; + p++; + break; + } + if (key_len < sizeof(key)-1) key[key_len++] = *p; + p++; + } + key[key_len] = '\0'; + + /* Parse value from second column (handle quotes) */ + in_quotes = 0; + while (*p && (field == 1)) { + if (!in_quotes && *p == '"') { + in_quotes = 1; + p++; + continue; + } + if (in_quotes) { + if (*p == '"') { + if (*(p+1) == '"') { + // Escaped quote + if (val_len < sizeof(val)-1) val[val_len++] = '"'; + p += 2; + continue; + } else { + in_quotes = 0; + p++; + continue; + } + } + if (val_len < sizeof(val)-1) val[val_len++] = *p; + p++; + continue; + } + if (*p == ',') { + /* Ignore extra fields */ + break; + } + if (val_len < sizeof(val)-1) val[val_len++] = *p; + p++; + } + + val[val_len] = '\0'; + + /* Check for unmatched quote: if in_quotes is set, log warning and skip line */ + if (in_quotes) { + flb_plg_warn(ctx->ins, "Unmatched quote in line %d, skipping", line_num); + line_num++; + continue; + } + + /* Normalize and trim key */ + char *key_ptr = NULL; + int key_ptr_allocated = normalize_and_trim(key, strlen(key), ctx->ignore_case, &key_ptr, &key_len); + if (key_ptr_allocated < 0) { + line_num++; + continue; + } + /* Normalize and trim value */ + char *val_ptr = NULL; + int val_ptr_allocated = normalize_and_trim(val, strlen(val), 0, &val_ptr, &val_len); + if (val_ptr_allocated < 0) { + if (key_ptr_allocated) flb_free(key_ptr); + line_num++; + continue; + } + if (key_len == 0 || val_len == 0 || key_len > sizeof(key) || val_len > sizeof(val)) { + if (key_ptr_allocated) flb_free(key_ptr); + if (val_ptr_allocated) flb_free(val_ptr); + line_num++; + continue; + } + /* Explicitly duplicate value buffer for hash table safety, allocate +1 for null terminator */ + char *val_heap = flb_malloc(val_len + 1); + if (!val_heap) { + if (key_ptr_allocated) flb_free(key_ptr); + if (val_ptr_allocated) flb_free(val_ptr); + line_num++; + continue; + } + memcpy(val_heap, val_ptr, val_len); + val_heap[val_len] = '\0'; + int ret = flb_hash_table_add(ctx->ht, key_ptr, key_len, val_heap, val_len); + if (ret < 0) { + flb_free(val_heap); + flb_plg_warn(ctx->ins, "Failed to add key '%.*s' (duplicate or error), skipping", (int)key_len, key_ptr); + if (key_ptr_allocated) flb_free(key_ptr); + if (val_ptr_allocated) flb_free(val_ptr); + line_num++; + continue; + } + /* Track allocated value for later cleanup */ + struct val_node *node = flb_malloc(sizeof(struct val_node)); + if (node) { + node->val = val_heap; + mk_list_add(&node->_head, &ctx->val_list); + } else { + /* If malloc fails, value will leak, but plugin will still function */ + flb_plg_warn(ctx->ins, "Failed to allocate val_node for value cleanup, value will leak"); + } + /* Do not free val_heap; hash table owns it now */ + if (key_ptr_allocated) flb_free(key_ptr); + if (val_ptr_allocated) flb_free(val_ptr); + line_num++; + } + fclose(fp); + return 0; +} + +static int cb_lookup_init(struct flb_filter_instance *ins, + struct flb_config *config, + void *data) +{ + int ret; + /* + * Allocate and initialize the filter context for this plugin instance. + * This context will hold configuration, hash table, and state. + */ + struct lookup_ctx *ctx; + ctx = flb_calloc(1, sizeof(struct lookup_ctx)); + if (!ctx) { + flb_errno(); + return -1; + } + ctx->ins = ins; + + /* + * Populate context fields from config_map. This sets file, lookup_key, + * result_key, and ignore_case from the configuration. + */ + ret = flb_filter_config_map_set(ins, ctx); + if (ret == -1) { + flb_free(ctx); + return -1; + } + + /* + * Validate required configuration options. All three must be set for + * the filter to operate. + */ + if (!ctx->file || !ctx->lookup_key || !ctx->result_key) { + flb_plg_error(ins, "missing required config: file, lookup_key, result_key"); + if (ctx->file) flb_free(ctx->file); + if (ctx->lookup_key) flb_free(ctx->lookup_key); + if (ctx->result_key) flb_free(ctx->result_key); + flb_free(ctx); + return -1; + } + + /* Check file existence and readability */ + if (access(ctx->file, R_OK) != 0) { + flb_plg_error(ins, "CSV file '%s' does not exist or is not readable", ctx->file); + if (ctx->file) flb_free(ctx->file); + if (ctx->lookup_key) flb_free(ctx->lookup_key); + if (ctx->result_key) flb_free(ctx->result_key); + flb_free(ctx); + return -1; + } + + /* + * Create hash table for lookups. This will store key-value pairs loaded + * from the CSV file for fast lookup during filtering. + */ + ctx->ht = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 1024, -1); + if (!ctx->ht) { + flb_plg_error(ins, "could not create hash table"); + if (ctx->file) flb_free(ctx->file); + if (ctx->lookup_key) flb_free(ctx->lookup_key); + if (ctx->result_key) flb_free(ctx->result_key); + flb_free(ctx); + return -1; + } + + /* Initialize record accessor for lookup_key */ + ctx->ra_lookup_key = flb_ra_create(ctx->lookup_key, FLB_TRUE); + if (!ctx->ra_lookup_key) { + flb_plg_error(ins, "invalid lookup_key pattern: %s", ctx->lookup_key); + flb_hash_table_destroy(ctx->ht); + if (ctx->file) flb_free(ctx->file); + if (ctx->lookup_key) flb_free(ctx->lookup_key); + if (ctx->result_key) flb_free(ctx->result_key); + flb_free(ctx); + return -1; + } + + /* Load CSV data into hash table. */ + ret = load_csv(ctx); + if (ret < 0) { + flb_ra_destroy(ctx->ra_lookup_key); + flb_hash_table_destroy(ctx->ht); + if (ctx->file) flb_free(ctx->file); + if (ctx->lookup_key) flb_free(ctx->lookup_key); + if (ctx->result_key) flb_free(ctx->result_key); + flb_free(ctx); + return -1; + } + flb_plg_info(ins, "Loaded %d entries from CSV", (int)ctx->ht->total_count); + + /* Store context for use in filter and exit callbacks. */ + flb_filter_set_context(ins, ctx); + return 0; +} + +static int emit_original_record( + struct flb_log_event_encoder *log_encoder, + struct flb_log_event *log_event, + struct flb_filter_instance *ins, + int rec_num) +{ + int ret = flb_log_event_encoder_begin_record(log_encoder); + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_set_timestamp(log_encoder, &log_event->timestamp); + } + if (ret == FLB_EVENT_ENCODER_SUCCESS && log_event->metadata) { + ret = flb_log_event_encoder_set_metadata_from_msgpack_object(log_encoder, log_event->metadata); + } + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_set_body_from_msgpack_object(log_encoder, log_event->body); + } + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_commit_record(log_encoder); + } else { + flb_log_event_encoder_rollback_record(log_encoder); + flb_plg_warn(ins, "Record %d: failed to encode original record, skipping", rec_num); + } + return ret; +} + +static int cb_lookup_filter(const void *data, size_t bytes, + const char *tag, int tag_len, + void **out_buf, size_t *out_bytes, + struct flb_filter_instance *ins, + struct flb_input_instance *in_ins, + void *context, + struct flb_config *config) +{ + /* + * Main filter callback: processes each log event in the input batch. + * For each record, attempts to look up a value in the hash table using + * the configured key. If found, adds result_key to the record; otherwise, + * emits the original record unchanged. + */ + struct lookup_ctx *ctx = context; + struct flb_log_event_decoder log_decoder; + struct flb_log_event_encoder log_encoder; + struct flb_log_event log_event; + int ret; + int rec_num = 0; + void *found_val = NULL; + size_t found_len = 0; + char *lookup_val_str = NULL; + size_t lookup_val_len = 0; + int lookup_val_allocated = 0; + + /* Defensive: ensure context is valid */ + if (!ctx) { + flb_plg_error(ins, "lookup filter context is NULL"); + return FLB_FILTER_NOTOUCH; + } + + /* Initialize log event decoder for input records */ + ret = flb_log_event_decoder_init(&log_decoder, (char *)data, bytes); + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ins, "Log event decoder initialization error : %d", ret); + return FLB_FILTER_NOTOUCH; + } + + /* Initialize log event encoder for output records */ + ret = flb_log_event_encoder_init(&log_encoder, FLB_LOG_EVENT_FORMAT_DEFAULT); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(ins, "Log event encoder initialization error : %d", ret); + flb_log_event_decoder_destroy(&log_decoder); + return FLB_FILTER_NOTOUCH; + } + + /* Process each log event in the input batch */ + while ((ret = flb_log_event_decoder_next(&log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + rec_num++; + lookup_val_str = NULL; + lookup_val_len = 0; + lookup_val_allocated = 0; + + /* Defensive: if body is not a map, emit original record and log debug */ + if (!log_event.body || log_event.body->type != MSGPACK_OBJECT_MAP) { + flb_plg_debug(ins, "Record %d: body is not a map (type=%d), emitting original", rec_num, log_event.body ? log_event.body->type : -1); + emit_original_record(&log_encoder, &log_event, ins, rec_num); + continue; + } + + /* + * Pre-scan for lookup_key to check for non-string types (array/map) + * that the record accessor cannot handle, to prevent 'cannot process key value' + * errors from flooding logs. + * + * This is a simple check for top-level keys and will not handle nested + * record accessor patterns. A more robust solution would require parsing + * the accessor pattern, which is beyond the scope of this simple fix. + */ + char *key_to_find = ctx->lookup_key; + size_t key_len = strlen(key_to_find); + if (key_to_find[0] == '$') { + key_to_find++; + key_len--; + } + + int key_found = 0; + int key_type = -1; + msgpack_object_map *map = &log_event.body->via.map; + for (int i = 0; i < map->size; i++) { + msgpack_object_kv *kv = &map->ptr[i]; + if (kv->key.type == MSGPACK_OBJECT_STR && + kv->key.via.str.size == key_len && + strncmp(kv->key.via.str.ptr, key_to_find, key_len) == 0) { + key_found = 1; + key_type = kv->val.type; + break; + } + } + + if (key_found && (key_type == MSGPACK_OBJECT_ARRAY || key_type == MSGPACK_OBJECT_MAP)) { + flb_plg_debug(ins, "Record %d: lookup_key '%s' has type array/map, skipping to avoid ra error", rec_num, ctx->lookup_key); + emit_original_record(&log_encoder, &log_event, ins, rec_num); + continue; + } + + /* Use record accessor to get the lookup value */ + struct flb_ra_value *rval = flb_ra_get_value_object(ctx->ra_lookup_key, *log_event.body); + if (!rval) { + /* Key not found, emit original record */ + emit_original_record(&log_encoder, &log_event, ins, rec_num); + continue; + } + + /* Extract string value from record accessor result */ + if (rval->type == FLB_RA_STRING) { + lookup_val_allocated = normalize_and_trim((char *)rval->o.via.str.ptr, rval->o.via.str.size, ctx->ignore_case, &lookup_val_str, &lookup_val_len); + if (lookup_val_allocated < 0) { + flb_plg_warn(ins, "Record %d: malloc failed for normalize_and_trim (string), skipping", rec_num); + lookup_val_str = NULL; + lookup_val_len = 0; + } + } + else { + /* Non-string value: convert to string using direct formatting */ + char val_buf[64]; + int printed = 0; + switch (rval->type) { + case FLB_RA_BOOL: + printed = snprintf(val_buf, sizeof(val_buf), "%s", rval->o.via.boolean ? "true" : "false"); + break; + case FLB_RA_INT: + printed = snprintf(val_buf, sizeof(val_buf), "%" PRId64, rval->o.via.i64); + break; + case FLB_RA_FLOAT: + printed = snprintf(val_buf, sizeof(val_buf), "%f", rval->o.via.f64); + break; + case FLB_RA_NULL: + printed = snprintf(val_buf, sizeof(val_buf), "null"); + break; + case 5: /* ARRAY */ + case 6: /* MAP */ + flb_plg_debug(ins, "Record %d: complex type (ARRAY/MAP) from record accessor, skipping conversion", rec_num); + flb_ra_key_value_destroy(rval); + emit_original_record(&log_encoder, &log_event, ins, rec_num); + continue; + default: + flb_plg_debug(ins, "Record %d: unsupported type %d, skipping conversion", rec_num, rval->type); + flb_ra_key_value_destroy(rval); + emit_original_record(&log_encoder, &log_event, ins, rec_num); + continue; + } + if (printed > 0 && printed < (int)sizeof(val_buf)) { + char *val_ptr = val_buf; + size_t val_len = printed; + lookup_val_allocated = normalize_and_trim(val_ptr, val_len, ctx->ignore_case, &lookup_val_str, &lookup_val_len); + if (lookup_val_allocated < 0) { + flb_plg_warn(ins, "Record %d: malloc failed for normalize_and_trim (non-string), skipping", rec_num); + lookup_val_str = NULL; + lookup_val_len = 0; + } + flb_plg_debug(ins, "Record %d: lookup value for key '%s' is non-string, converted to '%s'", rec_num, ctx->lookup_key, lookup_val_str); + } else { + flb_plg_debug(ins, "Record %d: lookup value for key '%s' is non-string and could not be converted, emitting original", rec_num, ctx->lookup_key); + flb_ra_key_value_destroy(rval); + emit_original_record(&log_encoder, &log_event, ins, rec_num); + continue; + } + } + + /* + * If lookup value is missing or empty, emit the original record unchanged. + */ + if (!lookup_val_str || lookup_val_len == 0) { + if (lookup_val_allocated) { + flb_free(lookup_val_str); + } + flb_ra_key_value_destroy(rval); + emit_original_record(&log_encoder, &log_event, ins, rec_num); + continue; + } + + /* + * Attempt to find the lookup value in the hash table. + * If not found, emit the original record unchanged. + */ + int ht_get_ret = flb_hash_table_get(ctx->ht, lookup_val_str, lookup_val_len, &found_val, &found_len); + /* Free normalization buffer if allocated */ + if (lookup_val_allocated) { + flb_free(lookup_val_str); + lookup_val_str = NULL; + } + flb_ra_key_value_destroy(rval); + + if (ht_get_ret < 0 || !found_val || found_len == 0) { + /* Not found, emit original record */ + emit_original_record(&log_encoder, &log_event, ins, rec_num); + continue; + } + + /* Begin new record */ + ret = flb_log_event_encoder_begin_record(&log_encoder); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_warn(ins, "Record %d: failed to begin new record, emitting original", rec_num); + emit_original_record(&log_encoder, &log_event, ins, rec_num); + continue; + } + + ret = flb_log_event_encoder_set_timestamp(&log_encoder, &log_event.timestamp); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_warn(ins, "Record %d: failed to set timestamp, emitting original", rec_num); + flb_log_event_encoder_rollback_record(&log_encoder); + emit_original_record(&log_encoder, &log_event, ins, rec_num); + continue; + } + + if (log_event.metadata) { + ret = flb_log_event_encoder_set_metadata_from_msgpack_object(&log_encoder, log_event.metadata); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_warn(ins, "Record %d: failed to set metadata, emitting original", rec_num); + flb_log_event_encoder_rollback_record(&log_encoder); + emit_original_record(&log_encoder, &log_event, ins, rec_num); + continue; + } + } + + /* Copy all keys except result_key (to avoid collision) */ + if (log_event.body && log_event.body->type == MSGPACK_OBJECT_MAP) { + int i; + for (i = 0; i < log_event.body->via.map.size; i++) { + msgpack_object_kv *kv = &log_event.body->via.map.ptr[i]; + if (kv->key.type == MSGPACK_OBJECT_STR && + kv->key.via.str.size == strlen(ctx->result_key) && + strncmp(kv->key.via.str.ptr, ctx->result_key, kv->key.via.str.size) == 0) { + continue; + } + ret = flb_log_event_encoder_append_body_values(&log_encoder, + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv->key), + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv->val)); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_warn(ins, "Record %d: failed to append key/value, emitting original", rec_num); + flb_log_event_encoder_rollback_record(&log_encoder); + emit_original_record(&log_encoder, &log_event, ins, rec_num); + continue; + } + } + } + + /* Add result_key */ + ret = flb_log_event_encoder_append_body_string(&log_encoder, ctx->result_key, strlen(ctx->result_key)); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_warn(ins, "Record %d: failed to append result_key, emitting original", rec_num); + flb_log_event_encoder_rollback_record(&log_encoder); + emit_original_record(&log_encoder, &log_event, ins, rec_num); + continue; + } + + ret = flb_log_event_encoder_append_body_string(&log_encoder, (char *)found_val, found_len); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_warn(ins, "Record %d: failed to append found_val, emitting original", rec_num); + flb_log_event_encoder_rollback_record(&log_encoder); + emit_original_record(&log_encoder, &log_event, ins, rec_num); + continue; + } + + ret = flb_log_event_encoder_commit_record(&log_encoder); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_warn(ins, "Record %d: failed to commit record, emitting original", rec_num); + flb_log_event_encoder_rollback_record(&log_encoder); + emit_original_record(&log_encoder, &log_event, ins, rec_num); + continue; + } + } + + /* + * If any records were modified, return the new buffer. + * Otherwise, indicate no change. + */ + if (log_encoder.output_length > 0) { + *out_buf = log_encoder.output_buffer; + *out_bytes = log_encoder.output_length; + flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); + ret = FLB_FILTER_MODIFIED; + } else { + ret = FLB_FILTER_NOTOUCH; + } + + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + return ret; +} + +static int cb_lookup_exit(void *data, struct flb_config *config) +{ + struct lookup_ctx *ctx = data; + if (!ctx) return 0; + /* Free all allocated values tracked in val_list */ + struct mk_list *tmp; + struct mk_list *head; + struct val_node *node; + mk_list_foreach_safe(head, tmp, &ctx->val_list) { + node = mk_list_entry(head, struct val_node, _head); + flb_free(node->val); + mk_list_del(head); + flb_free(node); + } + if (ctx->ra_lookup_key) flb_ra_destroy(ctx->ra_lookup_key); + if (ctx->ht) flb_hash_table_destroy(ctx->ht); + if (ctx->file) flb_free(ctx->file); + if (ctx->lookup_key) flb_free(ctx->lookup_key); + if (ctx->result_key) flb_free(ctx->result_key); + flb_free(ctx); + return 0; +} + +static struct flb_config_map config_map[] = { + { FLB_CONFIG_MAP_STR, "file", NULL, 0, FLB_TRUE, offsetof(struct lookup_ctx, file), "CSV file to lookup values from." }, + { FLB_CONFIG_MAP_STR, "lookup_key", NULL, 0, FLB_TRUE, offsetof(struct lookup_ctx, lookup_key), "Name of the key to lookup in input record." }, + { FLB_CONFIG_MAP_STR, "result_key", NULL, 0, FLB_TRUE, offsetof(struct lookup_ctx, result_key), "Name of the key to add to output record if found." }, + { FLB_CONFIG_MAP_BOOL, "ignore_case", "false", 0, FLB_TRUE, offsetof(struct lookup_ctx, ignore_case), "Ignore case when matching lookup values (default: false)." }, + {0} +}; + +struct flb_filter_plugin filter_lookup_plugin = { + .name = "lookup", + .description = "Lookup values from CSV file and add to records", + .cb_init = cb_lookup_init, + .cb_filter = cb_lookup_filter, + .cb_exit = cb_lookup_exit, + .config_map = config_map, + .flags = 0 +}; diff --git a/plugins/filter_lookup/lookup.h b/plugins/filter_lookup/lookup.h new file mode 100644 index 00000000000..aad22863474 --- /dev/null +++ b/plugins/filter_lookup/lookup.h @@ -0,0 +1,38 @@ +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_FILTER_LOOKUP_H +#define FLB_FILTER_LOOKUP_H + +#include +#include +#include + +struct lookup_ctx { + struct flb_filter_instance *ins; + char *file; + char *lookup_key; + char *result_key; + struct flb_hash_table *ht; + struct flb_record_accessor *ra_lookup_key; + int ignore_case; + struct mk_list val_list; +}; + +extern struct flb_filter_plugin filter_lookup_plugin; + +#endif /* FLB_FILTER_LOOKUP_H */ \ No newline at end of file From 79361e6b6cbb002a2e2a0d082816bf85276a3edd Mon Sep 17 00:00:00 2001 From: Oleg Mukhin Date: Sun, 20 Jul 2025 19:10:13 +0100 Subject: [PATCH 2/4] filter_lookup: stability/operational enhancements - Removed unecessary FLB_FILTER_LOOKUP build flag now LookUp is enabled by default like other filters (without flag). - Fixed critical use-after-free bug in numeric value lookups. - Added processed_records_total, matched_records_total and skipped_records_total metrics to enable operational visibility - Added unit tests to cover handling of different data types, CSV loading/handling and metrics tests. Tested with valgrind - no memory leaks. All unit tests pass. Signed-off-by: Oleg Mukhin --- CMakeLists.txt | 1 - cmake/plugins_options.cmake | 1 + plugins/filter_lookup/lookup.c | 595 ++++++++++++++----- plugins/filter_lookup/lookup.h | 16 +- tests/runtime/CMakeLists.txt | 1 + tests/runtime/filter_lookup.c | 1022 ++++++++++++++++++++++++++++++++ 6 files changed, 1474 insertions(+), 162 deletions(-) create mode 100644 tests/runtime/filter_lookup.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 4e760e1effb..5d9e7c879c9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -196,7 +196,6 @@ option(FLB_WASM_STACK_PROTECT "Build with WASM runtime with strong stack protec option(FLB_ENFORCE_ALIGNMENT "Enable limited platform specific aligned memory access" No) option(FLB_KAFKA "Enable Kafka support" Yes) option(FLB_ZIG "Enable zig integration" Yes) -option(FLB_FILTER_LOOKUP "Enable filter lookup support" Yes) # Native Metrics Support (cmetrics) option(FLB_METRICS "Enable metrics support" Yes) diff --git a/cmake/plugins_options.cmake b/cmake/plugins_options.cmake index 0e6cd9bf080..b878200bfa4 100644 --- a/cmake/plugins_options.cmake +++ b/cmake/plugins_options.cmake @@ -84,6 +84,7 @@ DEFINE_OPTION(FLB_FILTER_GEOIP2 "Enable geoip2 filter" DEFINE_OPTION(FLB_FILTER_GREP "Enable grep filter" ON) DEFINE_OPTION(FLB_FILTER_KUBERNETES "Enable kubernetes filter" ON) DEFINE_OPTION(FLB_FILTER_LOG_TO_METRICS "Enable log-derived metrics filter" ON) +DEFINE_OPTION(FLB_FILTER_LOOKUP "Enable lookup filter" ON) DEFINE_OPTION(FLB_FILTER_LUA "Enable Lua scripting filter" ON) DEFINE_OPTION(FLB_FILTER_MODIFY "Enable modify filter" ON) DEFINE_OPTION(FLB_FILTER_MULTILINE "Enable multiline filter" ON) diff --git a/plugins/filter_lookup/lookup.c b/plugins/filter_lookup/lookup.c index 36f449a86a9..fd792b35118 100644 --- a/plugins/filter_lookup/lookup.c +++ b/plugins/filter_lookup/lookup.c @@ -1,6 +1,6 @@ /* Fluent Bit * ========== - * Copyright (C) 2015-2024 The Fluent Bit Authors + * Copyright (C) 2015-2025 The Fluent Bit Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,6 @@ #include #include - #include #include #include @@ -25,6 +24,8 @@ #include #include #include +#include +#include #include #include #include @@ -32,6 +33,35 @@ #include #include #include +#include +#include + +#include "lookup.h" + +/* Macro to increment records metrics */ +#ifdef FLB_HAVE_METRICS +#define INCREMENT_SKIPPED_METRIC(ctx, ins) do { \ + uint64_t ts = cfl_time_now(); \ + cmt_counter_add(ctx->cmt_skipped, ts, 1, 1, (char *[]) {(char*)flb_filter_name(ins)}); \ + flb_metrics_sum(FLB_LOOKUP_METRIC_SKIPPED, 1, ins->metrics); \ +} while(0) + +#define INCREMENT_MATCHED_METRIC(ctx, ins) do { \ + uint64_t ts = cfl_time_now(); \ + cmt_counter_add(ctx->cmt_matched, ts, 1, 1, (char *[]) {(char*)flb_filter_name(ins)}); \ + flb_metrics_sum(FLB_LOOKUP_METRIC_MATCHED, 1, ins->metrics); \ +} while(0) + +#define INCREMENT_PROCESSED_METRIC(ctx, ins) do { \ + uint64_t ts = cfl_time_now(); \ + cmt_counter_add(ctx->cmt_processed, ts, 1, 1, (char *[]) {(char*)flb_filter_name(ins)}); \ + flb_metrics_sum(FLB_LOOKUP_METRIC_PROCESSED, 1, ins->metrics); \ +} while(0) +#else +#define INCREMENT_SKIPPED_METRIC(ctx, ins) do { } while(0) +#define INCREMENT_MATCHED_METRIC(ctx, ins) do { } while(0) +#define INCREMENT_PROCESSED_METRIC(ctx, ins) do { } while(0) +#endif struct val_node { @@ -39,17 +69,6 @@ struct val_node { void *val; }; -struct lookup_ctx { - struct flb_filter_instance *ins; - char *file; - char *lookup_key; - char *result_key; - struct flb_hash_table *ht; - struct flb_record_accessor *ra_lookup_key; - int ignore_case; - struct mk_list val_list; -}; - /* * Trims leading/trailing whitespace and optionally normalizes to lower-case. * Allocates output buffer (caller must free if output != input). @@ -100,41 +119,169 @@ static int normalize_and_trim(const char *input, size_t len, int ignore_case, ch } } +/* Dynamic buffer structure for growing strings */ +struct dynamic_buffer { + char *data; + size_t len; + size_t capacity; +}; + +/* Initialize a dynamic buffer */ +static int dynbuf_init(struct dynamic_buffer *buf, size_t initial_capacity) +{ + buf->data = flb_malloc(initial_capacity); + if (!buf->data) { + return -1; + } + buf->len = 0; + buf->capacity = initial_capacity; + buf->data[0] = '\0'; + return 0; +} + +/* Append a character to dynamic buffer, growing if necessary */ +static int dynbuf_append_char(struct dynamic_buffer *buf, char c) +{ + /* Ensure we have space for the character plus null terminator */ + if (buf->len + 1 >= buf->capacity) { + size_t new_capacity = buf->capacity * 2; + char *new_data = flb_realloc(buf->data, new_capacity); + if (!new_data) { + return -1; + } + buf->data = new_data; + buf->capacity = new_capacity; + } + buf->data[buf->len++] = c; + buf->data[buf->len] = '\0'; + return 0; +} + +/* Free dynamic buffer */ +static void dynbuf_destroy(struct dynamic_buffer *buf) +{ + if (buf && buf->data) { + flb_free(buf->data); + buf->data = NULL; + buf->len = 0; + buf->capacity = 0; + } +} + +/* Read a line of arbitrary length from file using dynamic allocation */ +static char *read_line_dynamic(FILE *fp, size_t *line_length) +{ + size_t capacity = 256; /* Initial capacity */ + size_t len = 0; + char *line = flb_malloc(capacity); + int c; + + if (!line) { + return NULL; + } + + while ((c = fgetc(fp)) != EOF) { + /* Check if we need to grow the buffer */ + if (len + 1 >= capacity) { + size_t new_capacity = capacity * 2; + char *new_line = flb_realloc(line, new_capacity); + if (!new_line) { + flb_free(line); + return NULL; + } + line = new_line; + capacity = new_capacity; + } + + /* Add character to buffer */ + line[len++] = c; + + /* Check for end of line */ + if (c == '\n') { + break; + } + } + + /* If we read nothing and hit EOF, return NULL */ + if (len == 0 && c == EOF) { + flb_free(line); + return NULL; + } + + /* Null terminate the string */ + if (len >= capacity) { + char *new_line = flb_realloc(line, len + 1); + if (!new_line) { + flb_free(line); + return NULL; + } + line = new_line; + } + line[len] = '\0'; + + /* Remove trailing \r\n characters */ + while (len > 0 && (line[len - 1] == '\n' || line[len - 1] == '\r')) { + line[--len] = '\0'; + } + + if (line_length) { + *line_length = len; + } + + return line; +} + static int load_csv(struct lookup_ctx *ctx) { FILE *fp; - char line[4096]; int line_num = 1; fp = fopen(ctx->file, "r"); if (!fp) { - flb_plg_error(ctx->ins, "cannot open CSV file: %s", ctx->file); + flb_plg_error(ctx->ins, "cannot open CSV file '%s': %s", ctx->file, strerror(errno)); return -1; } /* Initialize value list if not already */ mk_list_init(&ctx->val_list); - /* Skip header */ - if (!fgets(line, sizeof(line), fp)) { + + /* Skip header using dynamic line reading */ + char *header_line = read_line_dynamic(fp, NULL); + if (!header_line) { flb_plg_error(ctx->ins, "empty CSV file: %s", ctx->file); fclose(fp); return -1; } - while (fgets(line, sizeof(line), fp)) { - line[strcspn(line, "\r\n")] = '\0'; - if (strlen(line) == 0) { + flb_free(header_line); /* Free the header line as we don't need it */ + + char *line; + size_t line_length; + while ((line = read_line_dynamic(fp, &line_length)) != NULL) { + if (line_length == 0) { + flb_free(line); line_num++; continue; } - /* Handle quotes in CSV files */ + /* Handle quotes in CSV files using dynamic buffers */ char *p = line; - char key[2048]; - char val[2048]; - size_t key_len = 0, val_len = 0; - key[0] = '\0'; - val[0] = '\0'; - int in_quotes = 0; + struct dynamic_buffer key_buf, val_buf; + int in_quotes = 0; int field = 0; /* 0=key, 1=val */ + /* Initialize dynamic buffers */ + if (dynbuf_init(&key_buf, 256) != 0) { + flb_plg_debug(ctx->ins, "Failed to initialize key buffer for line %d", line_num); + flb_free(line); + line_num++; + continue; + } + if (dynbuf_init(&val_buf, 256) != 0) { + flb_plg_debug(ctx->ins, "Failed to initialize value buffer for line %d", line_num); + dynbuf_destroy(&key_buf); + flb_free(line); + line_num++; + continue; + } + /* Parse key from first column (and handle quotes) */ while (*p && (field == 0)) { if (!in_quotes && *p == '"') { @@ -146,7 +293,14 @@ static int load_csv(struct lookup_ctx *ctx) if (*p == '"') { if (*(p+1) == '"') { /* Escaped quote */ - if (key_len < sizeof(key)-1) key[key_len++] = '"'; + if (dynbuf_append_char(&key_buf, '"') != 0) { + flb_plg_debug(ctx->ins, "Buffer allocation failed for line %d", line_num); + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); + line_num++; + goto next_line; + } p += 2; continue; } else { @@ -155,7 +309,14 @@ static int load_csv(struct lookup_ctx *ctx) continue; } } - if (key_len < sizeof(key)-1) key[key_len++] = *p; + if (dynbuf_append_char(&key_buf, *p) != 0) { + flb_plg_debug(ctx->ins, "Buffer allocation failed for line %d", line_num); + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); + line_num++; + goto next_line; + } p++; continue; } @@ -164,10 +325,16 @@ static int load_csv(struct lookup_ctx *ctx) p++; break; } - if (key_len < sizeof(key)-1) key[key_len++] = *p; + if (dynbuf_append_char(&key_buf, *p) != 0) { + flb_plg_debug(ctx->ins, "Buffer allocation failed for line %d", line_num); + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); + line_num++; + goto next_line; + } p++; } - key[key_len] = '\0'; /* Parse value from second column (handle quotes) */ in_quotes = 0; @@ -181,7 +348,14 @@ static int load_csv(struct lookup_ctx *ctx) if (*p == '"') { if (*(p+1) == '"') { // Escaped quote - if (val_len < sizeof(val)-1) val[val_len++] = '"'; + if (dynbuf_append_char(&val_buf, '"') != 0) { + flb_plg_error(ctx->ins, "Failed to append to value buffer for line %d", line_num); + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); + line_num++; + goto next_line; + } p += 2; continue; } else { @@ -190,7 +364,14 @@ static int load_csv(struct lookup_ctx *ctx) continue; } } - if (val_len < sizeof(val)-1) val[val_len++] = *p; + if (dynbuf_append_char(&val_buf, *p) != 0) { + flb_plg_error(ctx->ins, "Failed to append to value buffer for line %d", line_num); + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); + line_num++; + goto next_line; + } p++; continue; } @@ -198,37 +379,56 @@ static int load_csv(struct lookup_ctx *ctx) /* Ignore extra fields */ break; } - if (val_len < sizeof(val)-1) val[val_len++] = *p; + if (dynbuf_append_char(&val_buf, *p) != 0) { + flb_plg_error(ctx->ins, "Failed to append to value buffer for line %d", line_num); + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); + line_num++; + goto next_line; + } p++; } - val[val_len] = '\0'; - /* Check for unmatched quote: if in_quotes is set, log warning and skip line */ if (in_quotes) { flb_plg_warn(ctx->ins, "Unmatched quote in line %d, skipping", line_num); + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); line_num++; continue; } /* Normalize and trim key */ char *key_ptr = NULL; - int key_ptr_allocated = normalize_and_trim(key, strlen(key), ctx->ignore_case, &key_ptr, &key_len); + size_t key_len = 0; + int key_ptr_allocated = normalize_and_trim(key_buf.data, key_buf.len, ctx->ignore_case, &key_ptr, &key_len); if (key_ptr_allocated < 0) { + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); line_num++; continue; } /* Normalize and trim value */ char *val_ptr = NULL; - int val_ptr_allocated = normalize_and_trim(val, strlen(val), 0, &val_ptr, &val_len); + size_t val_len = 0; + int val_ptr_allocated = normalize_and_trim(val_buf.data, val_buf.len, 0, &val_ptr, &val_len); if (val_ptr_allocated < 0) { if (key_ptr_allocated) flb_free(key_ptr); + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); line_num++; continue; } - if (key_len == 0 || val_len == 0 || key_len > sizeof(key) || val_len > sizeof(val)) { + if (key_len == 0 || val_len == 0) { if (key_ptr_allocated) flb_free(key_ptr); if (val_ptr_allocated) flb_free(val_ptr); + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); line_num++; continue; } @@ -237,6 +437,9 @@ static int load_csv(struct lookup_ctx *ctx) if (!val_heap) { if (key_ptr_allocated) flb_free(key_ptr); if (val_ptr_allocated) flb_free(val_ptr); + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); line_num++; continue; } @@ -248,6 +451,9 @@ static int load_csv(struct lookup_ctx *ctx) flb_plg_warn(ctx->ins, "Failed to add key '%.*s' (duplicate or error), skipping", (int)key_len, key_ptr); if (key_ptr_allocated) flb_free(key_ptr); if (val_ptr_allocated) flb_free(val_ptr); + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); line_num++; continue; } @@ -263,7 +469,15 @@ static int load_csv(struct lookup_ctx *ctx) /* Do not free val_heap; hash table owns it now */ if (key_ptr_allocated) flb_free(key_ptr); if (val_ptr_allocated) flb_free(val_ptr); + dynbuf_destroy(&key_buf); + dynbuf_destroy(&val_buf); + flb_free(line); line_num++; + continue; + + next_line: + /* Label for error handling - cleanup already done in error paths */ + continue; } fclose(fp); return 0; @@ -286,6 +500,29 @@ static int cb_lookup_init(struct flb_filter_instance *ins, } ctx->ins = ins; +#ifdef FLB_HAVE_METRICS + /* Initialize CMT metrics */ + ctx->cmt_processed = cmt_counter_create(ins->cmt, + "fluentbit", "filter", "lookup_processed_records_total", + "Total number of processed records", + 1, (char *[]) {"name"}); + + ctx->cmt_matched = cmt_counter_create(ins->cmt, + "fluentbit", "filter", "lookup_matched_records_total", + "Total number of matched records", + 1, (char *[]) {"name"}); + + ctx->cmt_skipped = cmt_counter_create(ins->cmt, + "fluentbit", "filter", "lookup_skipped_records_total", + "Total number of skipped records due to errors", + 1, (char *[]) {"name"}); + + /* Add to old metrics system */ + flb_metrics_add(FLB_LOOKUP_METRIC_PROCESSED, "processed_records_total", ins->metrics); + flb_metrics_add(FLB_LOOKUP_METRIC_MATCHED, "matched_records_total", ins->metrics); + flb_metrics_add(FLB_LOOKUP_METRIC_SKIPPED, "skipped_records_total", ins->metrics); +#endif + /* * Populate context fields from config_map. This sets file, lookup_key, * result_key, and ignore_case from the configuration. @@ -302,21 +539,13 @@ static int cb_lookup_init(struct flb_filter_instance *ins, */ if (!ctx->file || !ctx->lookup_key || !ctx->result_key) { flb_plg_error(ins, "missing required config: file, lookup_key, result_key"); - if (ctx->file) flb_free(ctx->file); - if (ctx->lookup_key) flb_free(ctx->lookup_key); - if (ctx->result_key) flb_free(ctx->result_key); - flb_free(ctx); - return -1; + goto error; } /* Check file existence and readability */ if (access(ctx->file, R_OK) != 0) { - flb_plg_error(ins, "CSV file '%s' does not exist or is not readable", ctx->file); - if (ctx->file) flb_free(ctx->file); - if (ctx->lookup_key) flb_free(ctx->lookup_key); - if (ctx->result_key) flb_free(ctx->result_key); - flb_free(ctx); - return -1; + flb_plg_error(ins, "CSV file '%s' does not exist or is not readable: %s", ctx->file, strerror(errno)); + goto error; } /* @@ -326,47 +555,45 @@ static int cb_lookup_init(struct flb_filter_instance *ins, ctx->ht = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 1024, -1); if (!ctx->ht) { flb_plg_error(ins, "could not create hash table"); - if (ctx->file) flb_free(ctx->file); - if (ctx->lookup_key) flb_free(ctx->lookup_key); - if (ctx->result_key) flb_free(ctx->result_key); - flb_free(ctx); - return -1; + goto error; } /* Initialize record accessor for lookup_key */ ctx->ra_lookup_key = flb_ra_create(ctx->lookup_key, FLB_TRUE); if (!ctx->ra_lookup_key) { flb_plg_error(ins, "invalid lookup_key pattern: %s", ctx->lookup_key); - flb_hash_table_destroy(ctx->ht); - if (ctx->file) flb_free(ctx->file); - if (ctx->lookup_key) flb_free(ctx->lookup_key); - if (ctx->result_key) flb_free(ctx->result_key); - flb_free(ctx); - return -1; + goto error; } /* Load CSV data into hash table. */ ret = load_csv(ctx); if (ret < 0) { - flb_ra_destroy(ctx->ra_lookup_key); - flb_hash_table_destroy(ctx->ht); - if (ctx->file) flb_free(ctx->file); - if (ctx->lookup_key) flb_free(ctx->lookup_key); - if (ctx->result_key) flb_free(ctx->result_key); - flb_free(ctx); - return -1; + goto error; } - flb_plg_info(ins, "Loaded %d entries from CSV", (int)ctx->ht->total_count); + flb_plg_info(ins, "Loaded %d entries from CSV file '%s'", (int)ctx->ht->total_count, ctx->file); + flb_plg_info(ins, "Lookup filter initialized: lookup_key='%s', result_key='%s', ignore_case=%s", + ctx->lookup_key, ctx->result_key, ctx->ignore_case ? "true" : "false"); /* Store context for use in filter and exit callbacks. */ flb_filter_set_context(ins, ctx); return 0; + +error: + if (ctx->ra_lookup_key) { + flb_ra_destroy(ctx->ra_lookup_key); + } + if (ctx->ht) { + flb_hash_table_destroy(ctx->ht); + } + flb_free(ctx); + return -1; } static int emit_original_record( struct flb_log_event_encoder *log_encoder, struct flb_log_event *log_event, struct flb_filter_instance *ins, + struct lookup_ctx *ctx, int rec_num) { int ret = flb_log_event_encoder_begin_record(log_encoder); @@ -384,6 +611,9 @@ static int emit_original_record( } else { flb_log_event_encoder_rollback_record(log_encoder); flb_plg_warn(ins, "Record %d: failed to encode original record, skipping", rec_num); + if (ctx) { + INCREMENT_SKIPPED_METRIC(ctx, ins); + } } return ret; } @@ -413,8 +643,9 @@ static int cb_lookup_filter(const void *data, size_t bytes, char *lookup_val_str = NULL; size_t lookup_val_len = 0; int lookup_val_allocated = 0; + bool any_modified = false; /* Track if any records were modified */ - /* Defensive: ensure context is valid */ + /* Ensure context is valid */ if (!ctx) { flb_plg_error(ins, "lookup filter context is NULL"); return FLB_FILTER_NOTOUCH; @@ -438,50 +669,28 @@ static int cb_lookup_filter(const void *data, size_t bytes, /* Process each log event in the input batch */ while ((ret = flb_log_event_decoder_next(&log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { rec_num++; + INCREMENT_PROCESSED_METRIC(ctx, ins); lookup_val_str = NULL; lookup_val_len = 0; lookup_val_allocated = 0; - - /* Defensive: if body is not a map, emit original record and log debug */ + char *dynamic_val_buf = NULL; /* Track dynamic buffer for numeric conversions */ + + /* Helper macro to clean up dynamic buffer and allocated lookup strings */ + #define CLEANUP_DYNAMIC_BUFFERS() do { \ + if (dynamic_val_buf) { \ + flb_free(dynamic_val_buf); \ + dynamic_val_buf = NULL; \ + } \ + if (lookup_val_allocated && lookup_val_str) { \ + flb_free(lookup_val_str); \ + lookup_val_str = NULL; \ + } \ + } while(0) + + /* If body is not a map, emit original record and log debug */ if (!log_event.body || log_event.body->type != MSGPACK_OBJECT_MAP) { flb_plg_debug(ins, "Record %d: body is not a map (type=%d), emitting original", rec_num, log_event.body ? log_event.body->type : -1); - emit_original_record(&log_encoder, &log_event, ins, rec_num); - continue; - } - - /* - * Pre-scan for lookup_key to check for non-string types (array/map) - * that the record accessor cannot handle, to prevent 'cannot process key value' - * errors from flooding logs. - * - * This is a simple check for top-level keys and will not handle nested - * record accessor patterns. A more robust solution would require parsing - * the accessor pattern, which is beyond the scope of this simple fix. - */ - char *key_to_find = ctx->lookup_key; - size_t key_len = strlen(key_to_find); - if (key_to_find[0] == '$') { - key_to_find++; - key_len--; - } - - int key_found = 0; - int key_type = -1; - msgpack_object_map *map = &log_event.body->via.map; - for (int i = 0; i < map->size; i++) { - msgpack_object_kv *kv = &map->ptr[i]; - if (kv->key.type == MSGPACK_OBJECT_STR && - kv->key.via.str.size == key_len && - strncmp(kv->key.via.str.ptr, key_to_find, key_len) == 0) { - key_found = 1; - key_type = kv->val.type; - break; - } - } - - if (key_found && (key_type == MSGPACK_OBJECT_ARRAY || key_type == MSGPACK_OBJECT_MAP)) { - flb_plg_debug(ins, "Record %d: lookup_key '%s' has type array/map, skipping to avoid ra error", rec_num, ctx->lookup_key); - emit_original_record(&log_encoder, &log_event, ins, rec_num); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); continue; } @@ -489,7 +698,7 @@ static int cb_lookup_filter(const void *data, size_t bytes, struct flb_ra_value *rval = flb_ra_get_value_object(ctx->ra_lookup_key, *log_event.body); if (!rval) { /* Key not found, emit original record */ - emit_original_record(&log_encoder, &log_event, ins, rec_num); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); continue; } @@ -498,66 +707,118 @@ static int cb_lookup_filter(const void *data, size_t bytes, lookup_val_allocated = normalize_and_trim((char *)rval->o.via.str.ptr, rval->o.via.str.size, ctx->ignore_case, &lookup_val_str, &lookup_val_len); if (lookup_val_allocated < 0) { flb_plg_warn(ins, "Record %d: malloc failed for normalize_and_trim (string), skipping", rec_num); + INCREMENT_SKIPPED_METRIC(ctx, ins); lookup_val_str = NULL; lookup_val_len = 0; } } else { - /* Non-string value: convert to string using direct formatting */ - char val_buf[64]; - int printed = 0; + /* Non-string value: convert to string using two-pass dynamic allocation */ + int required_size = 0; + + /* First pass: determine required buffer size */ switch (rval->type) { case FLB_RA_BOOL: - printed = snprintf(val_buf, sizeof(val_buf), "%s", rval->o.via.boolean ? "true" : "false"); + required_size = snprintf(NULL, 0, "%s", rval->o.via.boolean ? "true" : "false"); break; case FLB_RA_INT: - printed = snprintf(val_buf, sizeof(val_buf), "%" PRId64, rval->o.via.i64); + required_size = snprintf(NULL, 0, "%" PRId64, rval->o.via.i64); break; case FLB_RA_FLOAT: - printed = snprintf(val_buf, sizeof(val_buf), "%f", rval->o.via.f64); + required_size = snprintf(NULL, 0, "%f", rval->o.via.f64); break; case FLB_RA_NULL: - printed = snprintf(val_buf, sizeof(val_buf), "null"); + required_size = snprintf(NULL, 0, "null"); break; - case 5: /* ARRAY */ - case 6: /* MAP */ + case 5: + case 6: flb_plg_debug(ins, "Record %d: complex type (ARRAY/MAP) from record accessor, skipping conversion", rec_num); + CLEANUP_DYNAMIC_BUFFERS(); flb_ra_key_value_destroy(rval); - emit_original_record(&log_encoder, &log_event, ins, rec_num); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); continue; default: flb_plg_debug(ins, "Record %d: unsupported type %d, skipping conversion", rec_num, rval->type); + CLEANUP_DYNAMIC_BUFFERS(); flb_ra_key_value_destroy(rval); - emit_original_record(&log_encoder, &log_event, ins, rec_num); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); continue; } - if (printed > 0 && printed < (int)sizeof(val_buf)) { - char *val_ptr = val_buf; - size_t val_len = printed; - lookup_val_allocated = normalize_and_trim(val_ptr, val_len, ctx->ignore_case, &lookup_val_str, &lookup_val_len); - if (lookup_val_allocated < 0) { - flb_plg_warn(ins, "Record %d: malloc failed for normalize_and_trim (non-string), skipping", rec_num); - lookup_val_str = NULL; - lookup_val_len = 0; - } - flb_plg_debug(ins, "Record %d: lookup value for key '%s' is non-string, converted to '%s'", rec_num, ctx->lookup_key, lookup_val_str); - } else { - flb_plg_debug(ins, "Record %d: lookup value for key '%s' is non-string and could not be converted, emitting original", rec_num, ctx->lookup_key); + + if (required_size < 0) { + flb_plg_debug(ins, "Record %d: snprintf sizing failed for type %d, skipping conversion", rec_num, rval->type); + CLEANUP_DYNAMIC_BUFFERS(); + flb_ra_key_value_destroy(rval); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); + continue; + } + + /* Allocate buffer with required size plus null terminator */ + dynamic_val_buf = flb_malloc(required_size + 1); + if (!dynamic_val_buf) { + flb_plg_warn(ins, "Record %d: malloc failed for dynamic value buffer (size %d), skipping", rec_num, required_size + 1); + INCREMENT_SKIPPED_METRIC(ctx, ins); + CLEANUP_DYNAMIC_BUFFERS(); + flb_ra_key_value_destroy(rval); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); + continue; + } + + /* Second pass: write to allocated buffer */ + int printed = 0; + switch (rval->type) { + case FLB_RA_BOOL: + printed = snprintf(dynamic_val_buf, required_size + 1, "%s", rval->o.via.boolean ? "true" : "false"); + break; + case FLB_RA_INT: + printed = snprintf(dynamic_val_buf, required_size + 1, "%" PRId64, rval->o.via.i64); + break; + case FLB_RA_FLOAT: + printed = snprintf(dynamic_val_buf, required_size + 1, "%f", rval->o.via.f64); + break; + case FLB_RA_NULL: + printed = snprintf(dynamic_val_buf, required_size + 1, "null"); + break; + } + + if (printed < 0 || printed != required_size) { + flb_plg_debug(ins, "Record %d: snprintf formatting failed (expected %d, got %d), skipping conversion", rec_num, required_size, printed); + CLEANUP_DYNAMIC_BUFFERS(); flb_ra_key_value_destroy(rval); - emit_original_record(&log_encoder, &log_event, ins, rec_num); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); continue; } + + /* Use the dynamically allocated buffer for normalization */ + lookup_val_allocated = normalize_and_trim(dynamic_val_buf, printed, ctx->ignore_case, &lookup_val_str, &lookup_val_len); + if (lookup_val_allocated < 0) { + flb_plg_warn(ins, "Record %d: malloc failed for normalize_and_trim (non-string), skipping", rec_num); + INCREMENT_SKIPPED_METRIC(ctx, ins); + CLEANUP_DYNAMIC_BUFFERS(); + flb_ra_key_value_destroy(rval); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); + continue; + } + + flb_plg_debug(ins, "Record %d: lookup value for key '%s' is non-string, converted to '%s'", rec_num, ctx->lookup_key, lookup_val_str ? lookup_val_str : "NULL"); + + /* + * If normalize_and_trim allocated a new buffer (lookup_val_allocated > 0), + * we can free the dynamic buffer now. Otherwise, lookup_val_str points + * into dynamic_val_buf and we must delay freeing it. + */ + if (lookup_val_allocated > 0) { + flb_free(dynamic_val_buf); + dynamic_val_buf = NULL; + } + /* Note: dynamic_val_buf will be freed later if still allocated */ } - /* - * If lookup value is missing or empty, emit the original record unchanged. - */ + /* If lookup value is missing or empty, emit the original record unchanged. */ if (!lookup_val_str || lookup_val_len == 0) { - if (lookup_val_allocated) { - flb_free(lookup_val_str); - } + CLEANUP_DYNAMIC_BUFFERS(); flb_ra_key_value_destroy(rval); - emit_original_record(&log_encoder, &log_event, ins, rec_num); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); continue; } @@ -566,32 +827,41 @@ static int cb_lookup_filter(const void *data, size_t bytes, * If not found, emit the original record unchanged. */ int ht_get_ret = flb_hash_table_get(ctx->ht, lookup_val_str, lookup_val_len, &found_val, &found_len); - /* Free normalization buffer if allocated */ - if (lookup_val_allocated) { - flb_free(lookup_val_str); - lookup_val_str = NULL; - } - flb_ra_key_value_destroy(rval); if (ht_get_ret < 0 || !found_val || found_len == 0) { /* Not found, emit original record */ - emit_original_record(&log_encoder, &log_event, ins, rec_num); + CLEANUP_DYNAMIC_BUFFERS(); + flb_ra_key_value_destroy(rval); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); continue; } + /* Match found - increment counter */ + INCREMENT_MATCHED_METRIC(ctx, ins); + any_modified = true; /* Mark that we have modified records */ + + flb_plg_trace(ins, "Record %d: Found match for '%.*s' -> '%.*s'", + rec_num, (int)lookup_val_len, lookup_val_str, (int)found_len, (char*)found_val); + + /* Free normalization buffer if allocated (after using it in trace) */ + CLEANUP_DYNAMIC_BUFFERS(); + flb_ra_key_value_destroy(rval); + /* Begin new record */ ret = flb_log_event_encoder_begin_record(&log_encoder); if (ret != FLB_EVENT_ENCODER_SUCCESS) { flb_plg_warn(ins, "Record %d: failed to begin new record, emitting original", rec_num); - emit_original_record(&log_encoder, &log_event, ins, rec_num); + INCREMENT_SKIPPED_METRIC(ctx, ins); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); continue; } ret = flb_log_event_encoder_set_timestamp(&log_encoder, &log_event.timestamp); if (ret != FLB_EVENT_ENCODER_SUCCESS) { flb_plg_warn(ins, "Record %d: failed to set timestamp, emitting original", rec_num); + INCREMENT_SKIPPED_METRIC(ctx, ins); flb_log_event_encoder_rollback_record(&log_encoder); - emit_original_record(&log_encoder, &log_event, ins, rec_num); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); continue; } @@ -599,8 +869,9 @@ static int cb_lookup_filter(const void *data, size_t bytes, ret = flb_log_event_encoder_set_metadata_from_msgpack_object(&log_encoder, log_event.metadata); if (ret != FLB_EVENT_ENCODER_SUCCESS) { flb_plg_warn(ins, "Record %d: failed to set metadata, emitting original", rec_num); + INCREMENT_SKIPPED_METRIC(ctx, ins); flb_log_event_encoder_rollback_record(&log_encoder); - emit_original_record(&log_encoder, &log_event, ins, rec_num); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); continue; } } @@ -620,8 +891,9 @@ static int cb_lookup_filter(const void *data, size_t bytes, FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv->val)); if (ret != FLB_EVENT_ENCODER_SUCCESS) { flb_plg_warn(ins, "Record %d: failed to append key/value, emitting original", rec_num); + INCREMENT_SKIPPED_METRIC(ctx, ins); flb_log_event_encoder_rollback_record(&log_encoder); - emit_original_record(&log_encoder, &log_event, ins, rec_num); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); continue; } } @@ -631,33 +903,38 @@ static int cb_lookup_filter(const void *data, size_t bytes, ret = flb_log_event_encoder_append_body_string(&log_encoder, ctx->result_key, strlen(ctx->result_key)); if (ret != FLB_EVENT_ENCODER_SUCCESS) { flb_plg_warn(ins, "Record %d: failed to append result_key, emitting original", rec_num); + INCREMENT_SKIPPED_METRIC(ctx, ins); flb_log_event_encoder_rollback_record(&log_encoder); - emit_original_record(&log_encoder, &log_event, ins, rec_num); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); continue; } ret = flb_log_event_encoder_append_body_string(&log_encoder, (char *)found_val, found_len); if (ret != FLB_EVENT_ENCODER_SUCCESS) { flb_plg_warn(ins, "Record %d: failed to append found_val, emitting original", rec_num); + INCREMENT_SKIPPED_METRIC(ctx, ins); flb_log_event_encoder_rollback_record(&log_encoder); - emit_original_record(&log_encoder, &log_event, ins, rec_num); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); continue; } ret = flb_log_event_encoder_commit_record(&log_encoder); if (ret != FLB_EVENT_ENCODER_SUCCESS) { flb_plg_warn(ins, "Record %d: failed to commit record, emitting original", rec_num); + INCREMENT_SKIPPED_METRIC(ctx, ins); flb_log_event_encoder_rollback_record(&log_encoder); - emit_original_record(&log_encoder, &log_event, ins, rec_num); + emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); continue; } } + #undef CLEANUP_DYNAMIC_BUFFERS + /* * If any records were modified, return the new buffer. - * Otherwise, indicate no change. + * Otherwise, indicate no change to avoid unnecessary buffer copy. */ - if (log_encoder.output_length > 0) { + if (any_modified) { *out_buf = log_encoder.output_buffer; *out_bytes = log_encoder.output_length; flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); @@ -675,6 +952,7 @@ static int cb_lookup_exit(void *data, struct flb_config *config) { struct lookup_ctx *ctx = data; if (!ctx) return 0; + /* Free all allocated values tracked in val_list */ struct mk_list *tmp; struct mk_list *head; @@ -687,9 +965,6 @@ static int cb_lookup_exit(void *data, struct flb_config *config) } if (ctx->ra_lookup_key) flb_ra_destroy(ctx->ra_lookup_key); if (ctx->ht) flb_hash_table_destroy(ctx->ht); - if (ctx->file) flb_free(ctx->file); - if (ctx->lookup_key) flb_free(ctx->lookup_key); - if (ctx->result_key) flb_free(ctx->result_key); flb_free(ctx); return 0; } diff --git a/plugins/filter_lookup/lookup.h b/plugins/filter_lookup/lookup.h index aad22863474..30b9cbd14f7 100644 --- a/plugins/filter_lookup/lookup.h +++ b/plugins/filter_lookup/lookup.h @@ -1,6 +1,6 @@ /* Fluent Bit * ========== - * Copyright (C) 2015-2024 The Fluent Bit Authors + * Copyright (C) 2015-2025 The Fluent Bit Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,15 @@ #include #include +#include +#include #include +#include + +/* Metric constants */ +#define FLB_LOOKUP_METRIC_PROCESSED 200 +#define FLB_LOOKUP_METRIC_MATCHED 201 +#define FLB_LOOKUP_METRIC_SKIPPED 202 struct lookup_ctx { struct flb_filter_instance *ins; @@ -31,6 +39,12 @@ struct lookup_ctx { struct flb_record_accessor *ra_lookup_key; int ignore_case; struct mk_list val_list; + +#ifdef FLB_HAVE_METRICS + struct cmt_counter *cmt_processed; + struct cmt_counter *cmt_matched; + struct cmt_counter *cmt_skipped; +#endif }; extern struct flb_filter_plugin filter_lookup_plugin; diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index 57fb5ede13a..b92e2be2655 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -180,6 +180,7 @@ if(FLB_IN_LIB AND FLB_OUT_LIB) FLB_RT_TEST(FLB_FILTER_NEST "filter_nest.c") FLB_RT_TEST(FLB_FILTER_REWRITE_TAG "filter_rewrite_tag.c") FLB_RT_TEST(FLB_FILTER_KUBERNETES "filter_kubernetes.c") + FLB_RT_TEST(FLB_FILTER_LOOKUP "filter_lookup.c") FLB_RT_TEST(FLB_FILTER_PARSER "filter_parser.c") FLB_RT_TEST(FLB_FILTER_MODIFY "filter_modify.c") FLB_RT_TEST(FLB_FILTER_LUA "filter_lua.c") diff --git a/tests/runtime/filter_lookup.c b/tests/runtime/filter_lookup.c new file mode 100644 index 00000000000..f4ccbd39cf0 --- /dev/null +++ b/tests/runtime/filter_lookup.c @@ -0,0 +1,1022 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "flb_tests_runtime.h" + +#define TMP_CSV_PATH "lookup_test.csv" + +struct test_ctx { + flb_ctx_t *flb; + int i_ffd; + int f_ffd; + int o_ffd; +}; + +static struct test_ctx *test_ctx_create(struct flb_lib_out_cb *data) +{ + int i_ffd; + int o_ffd; + int f_ffd; + struct test_ctx *ctx = NULL; + + ctx = flb_malloc(sizeof(struct test_ctx)); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("malloc failed"); + flb_errno(); + return NULL; + } + + /* Service config */ + ctx->flb = flb_create(); + flb_service_set(ctx->flb, + "Flush", "0.200000000", + "Grace", "1", + "Log_Level", "error", + NULL); + + /* Input */ + i_ffd = flb_input(ctx->flb, (char *) "lib", NULL); + TEST_CHECK(i_ffd >= 0); + flb_input_set(ctx->flb, i_ffd, "tag", "test", NULL); + ctx->i_ffd = i_ffd; + + /* Filter */ + f_ffd = flb_filter(ctx->flb, (char *) "lookup", NULL); + TEST_CHECK(f_ffd >= 0); + ctx->f_ffd = f_ffd; + + /* Output */ + o_ffd = flb_output(ctx->flb, (char *) "lib", (void *) data); + ctx->o_ffd = o_ffd; + TEST_CHECK(o_ffd >= 0); + flb_output_set(ctx->flb, o_ffd, + "match", "test", + NULL); + + return ctx; +} + +static void test_ctx_destroy(struct test_ctx *ctx) +{ + TEST_CHECK(ctx != NULL); + + sleep(1); + flb_stop(ctx->flb); + flb_destroy(ctx->flb); + flb_free(ctx); +} + +void delete_csv_file() +{ + unlink(TMP_CSV_PATH); +} + +int create_csv_file(char *csv_content) +{ + FILE *fp = NULL; + fp = fopen(TMP_CSV_PATH, "w"); + if (fp == NULL) { + TEST_MSG("fopen error\n"); + return -1; + } + fprintf(fp, "%s", csv_content); + fflush(fp); + fclose(fp); + return 0; +} + +/* Callback to check expected results */ +static int cb_check_result_json(void *record, size_t size, void *data) +{ + char *p; + char *expected; + char *result; + + expected = (char *) data; + result = (char *) record; + + p = strstr(result, expected); + TEST_CHECK(p != NULL); + + if (p == NULL) { + flb_error("Expected to find: '%s' in result '%s'", + expected, result); + } + + flb_free(record); + return 0; +} + +/* Test basic lookup functionality */ +void flb_test_lookup_basic(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *csv_content = + "key,value\n" + "user1,John Doe\n" + "user2,Jane Smith\n" + "user3,Bob Wilson\n"; + char *input = "[0, {\"user_id\": \"user1\"}]"; + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"user_name\":\"John Doe\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = create_csv_file(csv_content); + TEST_CHECK(ret == 0); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "user_id", + "result_key", "user_name", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + flb_time_msleep(1500); + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Test lookup with ignore_case option */ +void flb_test_lookup_ignore_case(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *csv_content = + "key,value\n" + "USER1,John Doe\n" + "user2,Jane Smith\n"; + char *input = "[0, {\"user_id\": \"user1\"}]"; + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"user_name\":\"John Doe\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = create_csv_file(csv_content); + TEST_CHECK(ret == 0); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "user_id", + "result_key", "user_name", + "ignore_case", "true", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + flb_time_msleep(1500); + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Test lookup with CSV containing quotes and special characters */ +void flb_test_lookup_csv_quotes(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *csv_content = + "key,value\n" + "\"quoted,key\",\"Value with \"\"quotes\"\" and, commas\"\n" + "simple_key,Simple Value\n"; + char *input = "[0, {\"lookup_field\": \"quoted,key\"}]"; + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"result_field\":\"Value with \\\"quotes\\\" and, commas\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = create_csv_file(csv_content); + TEST_CHECK(ret == 0); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "lookup_field", + "result_key", "result_field", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + flb_time_msleep(1500); + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Test lookup with numeric values */ +void flb_test_lookup_numeric_values(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *csv_content = + "key,value\n" + "123,Numeric Key\n" + "456,Another Number\n"; + char *input = "[0, {\"numeric_field\": 123}]"; + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"description\":\"Numeric Key\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = create_csv_file(csv_content); + TEST_CHECK(ret == 0); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "numeric_field", + "result_key", "description", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + flb_time_msleep(1500); + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Test lookup with very large numbers (testing the two-pass snprintf fix) */ +void flb_test_lookup_large_numbers(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char large_number_str[64]; + snprintf(large_number_str, sizeof(large_number_str), "%" PRId64, LLONG_MAX); + + char csv_content[256]; + snprintf(csv_content, sizeof(csv_content), + "key,value\n" + "%s,Very Large Number\n" + "456,Small Number\n", large_number_str); + + char input[128]; + snprintf(input, sizeof(input), "[0, {\"big_number\": %" PRId64 "}]", LLONG_MAX); + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"number_desc\":\"Very Large Number\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = create_csv_file(csv_content); + TEST_CHECK(ret == 0); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "big_number", + "result_key", "number_desc", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + flb_time_msleep(1500); + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Test lookup with boolean values */ +void flb_test_lookup_boolean_values(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *csv_content = + "key,value\n" + "true,Boolean True\n" + "false,Boolean False\n"; + char *input = "[0, {\"bool_field\": true}]"; + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"bool_desc\":\"Boolean True\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = create_csv_file(csv_content); + TEST_CHECK(ret == 0); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "bool_field", + "result_key", "bool_desc", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + flb_time_msleep(1500); + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Test lookup with no match (should emit original record) */ +void flb_test_lookup_no_match(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *csv_content = + "key,value\n" + "user1,John Doe\n" + "user2,Jane Smith\n"; + char *input = "[0, {\"user_id\": \"user999\", \"other_field\": \"test\"}]"; + + /* Should NOT contain the result_key since no match was found */ + cb_data.cb = cb_check_result_json; + cb_data.data = "\"other_field\":\"test\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = create_csv_file(csv_content); + TEST_CHECK(ret == 0); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "user_id", + "result_key", "user_name", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + flb_time_msleep(1500); + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Test dynamic line reading with very long CSV lines */ +void flb_test_lookup_long_csv_lines(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *input = "[0, {\"key_field\": \"long_key\"}]"; + + /* Test that long CSV values (>4096 chars) can be read correctly. + * Just verify that the lookup worked by checking for value_field key. */ + cb_data.cb = cb_check_result_json; + cb_data.data = "\"value_field\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + /* Create CSV file with very long lines */ + FILE *fp = fopen(TMP_CSV_PATH, "w"); + TEST_CHECK(fp != NULL); + + fprintf(fp, "key,value\n"); + fprintf(fp, "long_key,"); + + /* Write a very long value (> 4096 chars) */ + for (int i = 0; i < 100; i++) { + fprintf(fp, "This is a very long value that exceeds the original 4096 character buffer limit to test dynamic line reading functionality. "); + } + fprintf(fp, "\n"); + fprintf(fp, "short_key,Short Value\n"); + fclose(fp); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "key_field", + "result_key", "value_field", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + flb_time_msleep(1500); + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Test with whitespace trimming */ +void flb_test_lookup_whitespace_trim(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *csv_content = + "key,value\n" + " trimmed_key , Trimmed Value \n" + "normal_key,Normal Value\n"; + char *input = "[0, {\"lookup_field\": \" trimmed_key \"}]"; + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"result_field\":\"Trimmed Value\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = create_csv_file(csv_content); + TEST_CHECK(ret == 0); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "lookup_field", + "result_key", "result_field", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + flb_time_msleep(1500); + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Mock the dynamic buffer functions and structure for testing */ +struct dynamic_buffer { + char *data; + size_t len; + size_t capacity; +}; + +static int dynbuf_init(struct dynamic_buffer *buf, size_t initial_capacity) { + buf->data = malloc(initial_capacity); + if (!buf->data) return -1; + buf->len = 0; + buf->capacity = initial_capacity; + buf->data[0] = '\0'; + return 0; +} + +static int dynbuf_append_char(struct dynamic_buffer *buf, char c) { + if (buf->len + 1 >= buf->capacity) { + size_t new_capacity = buf->capacity * 2; + char *new_data = realloc(buf->data, new_capacity); + if (!new_data) return -1; + buf->data = new_data; + buf->capacity = new_capacity; + } + buf->data[buf->len++] = c; + buf->data[buf->len] = '\0'; + return 0; +} + +static void dynbuf_destroy(struct dynamic_buffer *buf) { + if (buf && buf->data) { + free(buf->data); + buf->data = NULL; + buf->len = 0; + buf->capacity = 0; + } +} + +/* Test dynamic buffer functionality */ +void flb_test_dynamic_buffer(void) +{ + /* This is an internal unit test that doesn't require Fluent Bit setup */ + + struct dynamic_buffer buf; + + /* Test initialization */ + int ret = dynbuf_init(&buf, 4); + TEST_CHECK(ret == 0); + TEST_CHECK(buf.capacity == 4); + TEST_CHECK(buf.len == 0); + + /* Test appending characters that will cause growth */ + const char *test_str = "This is a test string that is longer than the initial capacity"; + for (size_t i = 0; test_str[i]; i++) { + ret = dynbuf_append_char(&buf, test_str[i]); + TEST_CHECK(ret == 0); + } + + TEST_CHECK(strcmp(buf.data, test_str) == 0); + TEST_CHECK(buf.len == strlen(test_str)); + TEST_CHECK(buf.capacity >= buf.len + 1); + + dynbuf_destroy(&buf); +} + +/* Test nested record accessor patterns ($a.b.c) */ +void flb_test_lookup_nested_keys(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *csv_content = + "key,value\n" + "user123,John Doe\n" + "admin456,Jane Smith\n"; + char *input = "[0, {\"user\": {\"profile\": {\"id\": \"user123\"}}, \"other_field\": \"test\"}]"; + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"user_name\":\"John Doe\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = create_csv_file(csv_content); + TEST_CHECK(ret == 0); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "$user['profile']['id']", + "result_key", "user_name", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + flb_time_msleep(1500); + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Test with large CSV file (performance/load testing) */ +void flb_test_lookup_large_csv(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *input = "[0, {\"user_id\": \"user5000\"}]"; + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"user_name\":\"User 5000\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + /* Create CSV file with 10,000 entries for performance testing */ + FILE *fp = fopen(TMP_CSV_PATH, "w"); + TEST_CHECK(fp != NULL); + + fprintf(fp, "key,value\n"); + + /* Write 10,000 test entries */ + for (int i = 1; i <= 10000; i++) { + fprintf(fp, "user%d,User %d\n", i, i); + } + fclose(fp); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "user_id", + "result_key", "user_name", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Test lookup performance with large dataset */ + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + flb_time_msleep(2000); /* Give more time for large CSV processing */ + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Test nested record accessor with array indexing ($users[0].id) */ +void flb_test_lookup_nested_array_keys(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *csv_content = + "key,value\n" + "array_user1,First User\n" + "array_user2,Second User\n"; + char *input = "[0, {\"users\": [{\"id\": \"array_user1\"}, {\"id\": \"array_user2\"}], \"metadata\": \"test\"}]"; + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"user_desc\":\"First User\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = create_csv_file(csv_content); + TEST_CHECK(ret == 0); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "$users[0]['id']", + "result_key", "user_desc", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + flb_time_msleep(1500); + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Custom callback to capture metrics and verify counts */ +static int cb_check_metrics(void *record, size_t size, void *data) +{ + /* Just free the record - we'll check metrics through the filter instance */ + flb_free(record); + return 0; +} + +/* Helper function to get metric value from filter instance */ +static uint64_t get_filter_metric(struct test_ctx *ctx, int metric_id) +{ + struct flb_filter_instance *f_ins; + struct mk_list *head; + struct flb_metric *metric; + + mk_list_foreach(head, &ctx->flb->config->filters) { + f_ins = mk_list_entry(head, struct flb_filter_instance, _head); + if (f_ins->id == ctx->f_ffd && f_ins->metrics) { + metric = flb_metrics_get_id(metric_id, f_ins->metrics); + if (metric) { + return metric->val; + } + } + } + return 0; +} + +/* Test metrics with matched records */ +void flb_test_lookup_metrics_matched(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *csv_content = + "key,value\n" + "user1,John Doe\n" + "user2,Jane Smith\n" + "user3,Bob Wilson\n"; + char *input1 = "[0, {\"user_id\": \"user1\"}]"; + char *input2 = "[0, {\"user_id\": \"user2\"}]"; + char *input3 = "[0, {\"user_id\": \"unknown\"}]"; // No match + + cb_data.cb = cb_check_metrics; + cb_data.data = NULL; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = create_csv_file(csv_content); + TEST_CHECK(ret == 0); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "user_id", + "result_key", "user_name", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Process three records: 2 matches + 1 no-match */ + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input1, strlen(input1)); + TEST_CHECK(bytes == strlen(input1)); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input2, strlen(input2)); + TEST_CHECK(bytes == strlen(input2)); + + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input3, strlen(input3)); + TEST_CHECK(bytes == strlen(input3)); + + flb_time_msleep(2000); + + /* Check metrics: should have 3 processed, 2 matched, 0 skipped */ + uint64_t processed = get_filter_metric(ctx, 200); // FLB_LOOKUP_METRIC_PROCESSED + uint64_t matched = get_filter_metric(ctx, 201); // FLB_LOOKUP_METRIC_MATCHED + uint64_t skipped = get_filter_metric(ctx, 202); // FLB_LOOKUP_METRIC_SKIPPED + + TEST_CHECK(processed == 3); + TEST_CHECK(matched == 2); + TEST_CHECK(skipped == 0); + + if (processed != 3) { + TEST_MSG("Expected processed=3, got %" PRIu64, processed); + } + if (matched != 2) { + TEST_MSG("Expected matched=2, got %" PRIu64, matched); + } + if (skipped != 0) { + TEST_MSG("Expected skipped=0, got %" PRIu64, skipped); + } + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +/* Test metrics with large volume to verify counter accuracy */ +void flb_test_lookup_metrics_processed(void) +{ + int ret; + int bytes; + struct test_ctx *ctx; + struct flb_lib_out_cb cb_data; + char *csv_content = + "key,value\n" + "match_key,Matched Value\n"; + + cb_data.cb = cb_check_metrics; + cb_data.data = NULL; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = create_csv_file(csv_content); + TEST_CHECK(ret == 0); + + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "Match", "*", + "file", TMP_CSV_PATH, + "lookup_key", "test_key", + "result_key", "test_result", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Send 20 matching records and 10 non-matching records */ + const int matching_count = 20; + const int non_matching_count = 10; + + for (int i = 0; i < matching_count; i++) { + char input[256]; + snprintf(input, sizeof(input), "[0, {\"test_key\": \"match_key\", \"seq\": %d}]", i); + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + } + + for (int i = 0; i < non_matching_count; i++) { + char input[256]; + snprintf(input, sizeof(input), "[0, {\"test_key\": \"no_match_%d\", \"seq\": %d}]", i, i); + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + } + + flb_time_msleep(3000); /* Give more time for processing large volume */ + + /* Verify metrics accuracy */ + uint64_t processed = get_filter_metric(ctx, 200); + uint64_t matched = get_filter_metric(ctx, 201); + uint64_t skipped = get_filter_metric(ctx, 202); + + TEST_CHECK(processed == matching_count + non_matching_count); + TEST_CHECK(matched == matching_count); + TEST_CHECK(skipped == 0); + + if (processed != matching_count + non_matching_count) { + TEST_MSG("Expected processed=%d, got %" PRIu64, matching_count + non_matching_count, processed); + } + if (matched != matching_count) { + TEST_MSG("Expected matched=%d, got %" PRIu64, matching_count, matched); + } + if (skipped != 0) { + TEST_MSG("Expected skipped=0, got %" PRIu64, skipped); + } + + delete_csv_file(); + test_ctx_destroy(ctx); +} + +TEST_LIST = { + {"basic_lookup", flb_test_lookup_basic}, + {"ignore_case", flb_test_lookup_ignore_case}, + {"csv_quotes", flb_test_lookup_csv_quotes}, + {"numeric_values", flb_test_lookup_numeric_values}, + {"large_numbers", flb_test_lookup_large_numbers}, + {"boolean_values", flb_test_lookup_boolean_values}, + {"no_match", flb_test_lookup_no_match}, + {"long_csv_lines", flb_test_lookup_long_csv_lines}, + {"whitespace_trim", flb_test_lookup_whitespace_trim}, + {"dynamic_buffer", flb_test_dynamic_buffer}, + {"nested_keys", flb_test_lookup_nested_keys}, + {"large_csv", flb_test_lookup_large_csv}, + {"nested_array_keys", flb_test_lookup_nested_array_keys}, + {"metrics_matched", flb_test_lookup_metrics_matched}, + {"metrics_processed", flb_test_lookup_metrics_processed}, + {NULL, NULL} +}; From ea66ebb52a0c27382d97753fc2959446004983b5 Mon Sep 17 00:00:00 2001 From: Oleg Mukhin Date: Mon, 21 Jul 2025 12:06:45 +0100 Subject: [PATCH 3/4] filter_lookup: fix Windows/CentOS compiler compatibility - fix variable declarations and remove C99 features - Conditional compilation for Windows vs Unix headers/functions - Replace bool with int, fix format specifiers, update comments All 15 unit tests for filter passed. Signed-off-by: Oleg Mukhin --- plugins/filter_lookup/lookup.c | 129 +++++++++++++++++++++------------ 1 file changed, 84 insertions(+), 45 deletions(-) diff --git a/plugins/filter_lookup/lookup.c b/plugins/filter_lookup/lookup.c index fd792b35118..8e07fda1462 100644 --- a/plugins/filter_lookup/lookup.c +++ b/plugins/filter_lookup/lookup.c @@ -31,10 +31,14 @@ #include #include #include +#ifndef _WIN32 #include +#else +#include +#endif #include -#include #include +#include #include "lookup.h" @@ -42,19 +46,25 @@ #ifdef FLB_HAVE_METRICS #define INCREMENT_SKIPPED_METRIC(ctx, ins) do { \ uint64_t ts = cfl_time_now(); \ - cmt_counter_add(ctx->cmt_skipped, ts, 1, 1, (char *[]) {(char*)flb_filter_name(ins)}); \ + static char* labels_array[1]; \ + labels_array[0] = (char*)flb_filter_name(ins); \ + cmt_counter_add(ctx->cmt_skipped, ts, 1, 1, labels_array); \ flb_metrics_sum(FLB_LOOKUP_METRIC_SKIPPED, 1, ins->metrics); \ } while(0) #define INCREMENT_MATCHED_METRIC(ctx, ins) do { \ uint64_t ts = cfl_time_now(); \ - cmt_counter_add(ctx->cmt_matched, ts, 1, 1, (char *[]) {(char*)flb_filter_name(ins)}); \ + static char* labels_array[1]; \ + labels_array[0] = (char*)flb_filter_name(ins); \ + cmt_counter_add(ctx->cmt_matched, ts, 1, 1, labels_array); \ flb_metrics_sum(FLB_LOOKUP_METRIC_MATCHED, 1, ins->metrics); \ } while(0) #define INCREMENT_PROCESSED_METRIC(ctx, ins) do { \ uint64_t ts = cfl_time_now(); \ - cmt_counter_add(ctx->cmt_processed, ts, 1, 1, (char *[]) {(char*)flb_filter_name(ins)}); \ + static char* labels_array[1]; \ + labels_array[0] = (char*)flb_filter_name(ins); \ + cmt_counter_add(ctx->cmt_processed, ts, 1, 1, labels_array); \ flb_metrics_sum(FLB_LOOKUP_METRIC_PROCESSED, 1, ins->metrics); \ } while(0) #else @@ -71,7 +81,6 @@ struct val_node { /* * Trims leading/trailing whitespace and optionally normalizes to lower-case. - * Allocates output buffer (caller must free if output != input). */ static int normalize_and_trim(const char *input, size_t len, int ignore_case, char **output, size_t *out_len) { @@ -99,13 +108,15 @@ static int normalize_and_trim(const char *input, size_t len, int ignore_case, ch return 0; } if (ignore_case) { - char *buf = flb_malloc(n + 1); + char *buf; + size_t j; + buf = flb_malloc(n + 1); if (!buf) { *output = NULL; *out_len = 0; return -1; } - for (size_t j = 0; j < n; j++) { + for (j = 0; j < n; j++) { buf[j] = tolower((unsigned char)start[j]); } buf[n] = '\0'; @@ -235,6 +246,10 @@ static int load_csv(struct lookup_ctx *ctx) { FILE *fp; int line_num = 1; + char *header_line; + char *line; + size_t line_length; + fp = fopen(ctx->file, "r"); if (!fp) { flb_plg_error(ctx->ins, "cannot open CSV file '%s': %s", ctx->file, strerror(errno)); @@ -244,7 +259,7 @@ static int load_csv(struct lookup_ctx *ctx) mk_list_init(&ctx->val_list); /* Skip header using dynamic line reading */ - char *header_line = read_line_dynamic(fp, NULL); + header_line = read_line_dynamic(fp, NULL); if (!header_line) { flb_plg_error(ctx->ins, "empty CSV file: %s", ctx->file); fclose(fp); @@ -252,9 +267,21 @@ static int load_csv(struct lookup_ctx *ctx) } flb_free(header_line); /* Free the header line as we don't need it */ - char *line; - size_t line_length; while ((line = read_line_dynamic(fp, &line_length)) != NULL) { + char *p; + struct dynamic_buffer key_buf, val_buf; + int in_quotes; + int field; /* 0=key, 1=val */ + char *key_ptr; + size_t key_len; + int key_ptr_allocated; + char *val_ptr; + size_t val_len; + int val_ptr_allocated; + char *val_heap; + int ret; + struct val_node *node; + if (line_length == 0) { flb_free(line); line_num++; @@ -262,10 +289,9 @@ static int load_csv(struct lookup_ctx *ctx) } /* Handle quotes in CSV files using dynamic buffers */ - char *p = line; - struct dynamic_buffer key_buf, val_buf; - int in_quotes = 0; - int field = 0; /* 0=key, 1=val */ + p = line; + in_quotes = 0; + field = 0; /* Initialize dynamic buffers */ if (dynbuf_init(&key_buf, 256) != 0) { @@ -347,7 +373,7 @@ static int load_csv(struct lookup_ctx *ctx) if (in_quotes) { if (*p == '"') { if (*(p+1) == '"') { - // Escaped quote + /* Escaped quote */ if (dynbuf_append_char(&val_buf, '"') != 0) { flb_plg_error(ctx->ins, "Failed to append to value buffer for line %d", line_num); dynbuf_destroy(&key_buf); @@ -401,9 +427,9 @@ static int load_csv(struct lookup_ctx *ctx) } /* Normalize and trim key */ - char *key_ptr = NULL; - size_t key_len = 0; - int key_ptr_allocated = normalize_and_trim(key_buf.data, key_buf.len, ctx->ignore_case, &key_ptr, &key_len); + key_ptr = NULL; + key_len = 0; + key_ptr_allocated = normalize_and_trim(key_buf.data, key_buf.len, ctx->ignore_case, &key_ptr, &key_len); if (key_ptr_allocated < 0) { dynbuf_destroy(&key_buf); dynbuf_destroy(&val_buf); @@ -412,9 +438,9 @@ static int load_csv(struct lookup_ctx *ctx) continue; } /* Normalize and trim value */ - char *val_ptr = NULL; - size_t val_len = 0; - int val_ptr_allocated = normalize_and_trim(val_buf.data, val_buf.len, 0, &val_ptr, &val_len); + val_ptr = NULL; + val_len = 0; + val_ptr_allocated = normalize_and_trim(val_buf.data, val_buf.len, 0, &val_ptr, &val_len); if (val_ptr_allocated < 0) { if (key_ptr_allocated) flb_free(key_ptr); dynbuf_destroy(&key_buf); @@ -433,7 +459,7 @@ static int load_csv(struct lookup_ctx *ctx) continue; } /* Explicitly duplicate value buffer for hash table safety, allocate +1 for null terminator */ - char *val_heap = flb_malloc(val_len + 1); + val_heap = flb_malloc(val_len + 1); if (!val_heap) { if (key_ptr_allocated) flb_free(key_ptr); if (val_ptr_allocated) flb_free(val_ptr); @@ -445,7 +471,7 @@ static int load_csv(struct lookup_ctx *ctx) } memcpy(val_heap, val_ptr, val_len); val_heap[val_len] = '\0'; - int ret = flb_hash_table_add(ctx->ht, key_ptr, key_len, val_heap, val_len); + ret = flb_hash_table_add(ctx->ht, key_ptr, key_len, val_heap, val_len); if (ret < 0) { flb_free(val_heap); flb_plg_warn(ctx->ins, "Failed to add key '%.*s' (duplicate or error), skipping", (int)key_len, key_ptr); @@ -458,7 +484,7 @@ static int load_csv(struct lookup_ctx *ctx) continue; } /* Track allocated value for later cleanup */ - struct val_node *node = flb_malloc(sizeof(struct val_node)); + node = flb_malloc(sizeof(struct val_node)); if (node) { node->val = val_heap; mk_list_add(&node->_head, &ctx->val_list); @@ -502,20 +528,23 @@ static int cb_lookup_init(struct flb_filter_instance *ins, #ifdef FLB_HAVE_METRICS /* Initialize CMT metrics */ - ctx->cmt_processed = cmt_counter_create(ins->cmt, - "fluentbit", "filter", "lookup_processed_records_total", - "Total number of processed records", - 1, (char *[]) {"name"}); + { + static char* labels_name[] = {"name"}; + ctx->cmt_processed = cmt_counter_create(ins->cmt, + "fluentbit", "filter", "lookup_processed_records_total", + "Total number of processed records", + 1, labels_name); - ctx->cmt_matched = cmt_counter_create(ins->cmt, - "fluentbit", "filter", "lookup_matched_records_total", - "Total number of matched records", - 1, (char *[]) {"name"}); + ctx->cmt_matched = cmt_counter_create(ins->cmt, + "fluentbit", "filter", "lookup_matched_records_total", + "Total number of matched records", + 1, labels_name); - ctx->cmt_skipped = cmt_counter_create(ins->cmt, - "fluentbit", "filter", "lookup_skipped_records_total", - "Total number of skipped records due to errors", - 1, (char *[]) {"name"}); + ctx->cmt_skipped = cmt_counter_create(ins->cmt, + "fluentbit", "filter", "lookup_skipped_records_total", + "Total number of skipped records due to errors", + 1, labels_name); + } /* Add to old metrics system */ flb_metrics_add(FLB_LOOKUP_METRIC_PROCESSED, "processed_records_total", ins->metrics); @@ -543,7 +572,11 @@ static int cb_lookup_init(struct flb_filter_instance *ins, } /* Check file existence and readability */ +#ifdef _WIN32 + if (_access(ctx->file, 04) != 0) { /* 04 = R_OK on Windows */ +#else if (access(ctx->file, R_OK) != 0) { +#endif flb_plg_error(ins, "CSV file '%s' does not exist or is not readable: %s", ctx->file, strerror(errno)); goto error; } @@ -570,7 +603,7 @@ static int cb_lookup_init(struct flb_filter_instance *ins, if (ret < 0) { goto error; } - flb_plg_info(ins, "Loaded %d entries from CSV file '%s'", (int)ctx->ht->total_count, ctx->file); + flb_plg_info(ins, "Loaded %zu entries from CSV file '%s'", (size_t)ctx->ht->total_count, ctx->file); flb_plg_info(ins, "Lookup filter initialized: lookup_key='%s', result_key='%s', ignore_case=%s", ctx->lookup_key, ctx->result_key, ctx->ignore_case ? "true" : "false"); @@ -643,7 +676,7 @@ static int cb_lookup_filter(const void *data, size_t bytes, char *lookup_val_str = NULL; size_t lookup_val_len = 0; int lookup_val_allocated = 0; - bool any_modified = false; /* Track if any records were modified */ + int any_modified = 0; /* Track if any records were modified */ /* Ensure context is valid */ if (!ctx) { @@ -668,12 +701,18 @@ static int cb_lookup_filter(const void *data, size_t bytes, /* Process each log event in the input batch */ while ((ret = flb_log_event_decoder_next(&log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + char *dynamic_val_buf; /* Track dynamic buffer for numeric conversions */ + int required_size; + int printed; + int ht_get_ret; + struct flb_ra_value *rval; + rec_num++; INCREMENT_PROCESSED_METRIC(ctx, ins); lookup_val_str = NULL; lookup_val_len = 0; lookup_val_allocated = 0; - char *dynamic_val_buf = NULL; /* Track dynamic buffer for numeric conversions */ + dynamic_val_buf = NULL; /* Helper macro to clean up dynamic buffer and allocated lookup strings */ #define CLEANUP_DYNAMIC_BUFFERS() do { \ @@ -695,7 +734,7 @@ static int cb_lookup_filter(const void *data, size_t bytes, } /* Use record accessor to get the lookup value */ - struct flb_ra_value *rval = flb_ra_get_value_object(ctx->ra_lookup_key, *log_event.body); + rval = flb_ra_get_value_object(ctx->ra_lookup_key, *log_event.body); if (!rval) { /* Key not found, emit original record */ emit_original_record(&log_encoder, &log_event, ins, ctx, rec_num); @@ -714,7 +753,7 @@ static int cb_lookup_filter(const void *data, size_t bytes, } else { /* Non-string value: convert to string using two-pass dynamic allocation */ - int required_size = 0; + required_size = 0; /* First pass: determine required buffer size */ switch (rval->type) { @@ -756,7 +795,7 @@ static int cb_lookup_filter(const void *data, size_t bytes, /* Allocate buffer with required size plus null terminator */ dynamic_val_buf = flb_malloc(required_size + 1); if (!dynamic_val_buf) { - flb_plg_warn(ins, "Record %d: malloc failed for dynamic value buffer (size %d), skipping", rec_num, required_size + 1); + flb_plg_warn(ins, "Record %d: malloc failed for dynamic value buffer (size %zu), skipping", rec_num, (size_t)(required_size + 1)); INCREMENT_SKIPPED_METRIC(ctx, ins); CLEANUP_DYNAMIC_BUFFERS(); flb_ra_key_value_destroy(rval); @@ -765,7 +804,7 @@ static int cb_lookup_filter(const void *data, size_t bytes, } /* Second pass: write to allocated buffer */ - int printed = 0; + printed = 0; switch (rval->type) { case FLB_RA_BOOL: printed = snprintf(dynamic_val_buf, required_size + 1, "%s", rval->o.via.boolean ? "true" : "false"); @@ -826,7 +865,7 @@ static int cb_lookup_filter(const void *data, size_t bytes, * Attempt to find the lookup value in the hash table. * If not found, emit the original record unchanged. */ - int ht_get_ret = flb_hash_table_get(ctx->ht, lookup_val_str, lookup_val_len, &found_val, &found_len); + ht_get_ret = flb_hash_table_get(ctx->ht, lookup_val_str, lookup_val_len, &found_val, &found_len); if (ht_get_ret < 0 || !found_val || found_len == 0) { /* Not found, emit original record */ @@ -838,7 +877,7 @@ static int cb_lookup_filter(const void *data, size_t bytes, /* Match found - increment counter */ INCREMENT_MATCHED_METRIC(ctx, ins); - any_modified = true; /* Mark that we have modified records */ + any_modified = 1; /* Mark that we have modified records */ flb_plg_trace(ins, "Record %d: Found match for '%.*s' -> '%.*s'", rec_num, (int)lookup_val_len, lookup_val_str, (int)found_len, (char*)found_val); From d98e5956ece49669a6a6076f627bc70b093324ea Mon Sep 17 00:00:00 2001 From: Oleg Mukhin Date: Mon, 21 Jul 2025 14:55:51 +0100 Subject: [PATCH 4/4] filter_lookup: fix CentOS unit test compatibility issue - fix variable declarations and remove C99 features for unit tests - Conditional compilation for Windows for unit test features All 15 unit tests for filter passed. Signed-off-by: Oleg Mukhin --- tests/runtime/filter_lookup.c | 76 ++++++++++++++++++++++------------- 1 file changed, 49 insertions(+), 27 deletions(-) diff --git a/tests/runtime/filter_lookup.c b/tests/runtime/filter_lookup.c index f4ccbd39cf0..a1f4808316e 100644 --- a/tests/runtime/filter_lookup.c +++ b/tests/runtime/filter_lookup.c @@ -23,7 +23,11 @@ #include #include #include +#ifndef _WIN32 #include +#else +#include +#endif #include #include #include @@ -89,7 +93,7 @@ static void test_ctx_destroy(struct test_ctx *ctx) { TEST_CHECK(ctx != NULL); - sleep(1); + flb_time_msleep(1000); flb_stop(ctx->flb); flb_destroy(ctx->flb); flb_free(ctx); @@ -97,7 +101,11 @@ static void test_ctx_destroy(struct test_ctx *ctx) void delete_csv_file() { +#ifdef _WIN32 + _unlink(TMP_CSV_PATH); +#else unlink(TMP_CSV_PATH); +#endif } int create_csv_file(char *csv_content) @@ -342,7 +350,7 @@ void flb_test_lookup_large_numbers(void) struct test_ctx *ctx; struct flb_lib_out_cb cb_data; char large_number_str[64]; - snprintf(large_number_str, sizeof(large_number_str), "%" PRId64, LLONG_MAX); + snprintf(large_number_str, sizeof(large_number_str), "%lld", (long long)LLONG_MAX); char csv_content[256]; snprintf(csv_content, sizeof(csv_content), @@ -351,7 +359,7 @@ void flb_test_lookup_large_numbers(void) "456,Small Number\n", large_number_str); char input[128]; - snprintf(input, sizeof(input), "[0, {\"big_number\": %" PRId64 "}]", LLONG_MAX); + snprintf(input, sizeof(input), "[0, {\"big_number\": %lld}]", (long long)LLONG_MAX); cb_data.cb = cb_check_result_json; cb_data.data = "\"number_desc\":\"Very Large Number\""; @@ -516,8 +524,11 @@ void flb_test_lookup_long_csv_lines(void) fprintf(fp, "long_key,"); /* Write a very long value (> 4096 chars) */ - for (int i = 0; i < 100; i++) { - fprintf(fp, "This is a very long value that exceeds the original 4096 character buffer limit to test dynamic line reading functionality. "); + { + int i; + for (i = 0; i < 100; i++) { + fprintf(fp, "This is a very long value that exceeds the original 4096 character buffer limit to test dynamic line reading functionality. "); + } } fprintf(fp, "\n"); fprintf(fp, "short_key,Short Value\n"); @@ -649,9 +660,12 @@ void flb_test_dynamic_buffer(void) /* Test appending characters that will cause growth */ const char *test_str = "This is a test string that is longer than the initial capacity"; - for (size_t i = 0; test_str[i]; i++) { - ret = dynbuf_append_char(&buf, test_str[i]); - TEST_CHECK(ret == 0); + { + size_t i; + for (i = 0; test_str[i]; i++) { + ret = dynbuf_append_char(&buf, test_str[i]); + TEST_CHECK(ret == 0); + } } TEST_CHECK(strcmp(buf.data, test_str) == 0); @@ -735,8 +749,11 @@ void flb_test_lookup_large_csv(void) fprintf(fp, "key,value\n"); /* Write 10,000 test entries */ - for (int i = 1; i <= 10000; i++) { - fprintf(fp, "user%d,User %d\n", i, i); + { + int i; + for (i = 1; i <= 10000; i++) { + fprintf(fp, "user%d,User %d\n", i, i); + } } fclose(fp); @@ -907,13 +924,13 @@ void flb_test_lookup_metrics_matched(void) TEST_CHECK(skipped == 0); if (processed != 3) { - TEST_MSG("Expected processed=3, got %" PRIu64, processed); + TEST_MSG("Expected processed=3, got %llu", (unsigned long long)processed); } if (matched != 2) { - TEST_MSG("Expected matched=2, got %" PRIu64, matched); + TEST_MSG("Expected matched=2, got %llu", (unsigned long long)matched); } if (skipped != 0) { - TEST_MSG("Expected skipped=0, got %" PRIu64, skipped); + TEST_MSG("Expected skipped=0, got %llu", (unsigned long long)skipped); } delete_csv_file(); @@ -962,19 +979,24 @@ void flb_test_lookup_metrics_processed(void) /* Send 20 matching records and 10 non-matching records */ const int matching_count = 20; const int non_matching_count = 10; - - for (int i = 0; i < matching_count; i++) { - char input[256]; - snprintf(input, sizeof(input), "[0, {\"test_key\": \"match_key\", \"seq\": %d}]", i); - bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); - TEST_CHECK(bytes == strlen(input)); + { + int i; + for (i = 0; i < matching_count; i++) { + char input[256]; + snprintf(input, sizeof(input), "[0, {\"test_key\": \"match_key\", \"seq\": %d}]", i); + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + } } - for (int i = 0; i < non_matching_count; i++) { - char input[256]; - snprintf(input, sizeof(input), "[0, {\"test_key\": \"no_match_%d\", \"seq\": %d}]", i, i); - bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); - TEST_CHECK(bytes == strlen(input)); + { + int i; + for (i = 0; i < non_matching_count; i++) { + char input[256]; + snprintf(input, sizeof(input), "[0, {\"test_key\": \"no_match_%d\", \"seq\": %d}]", i, i); + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, input, strlen(input)); + TEST_CHECK(bytes == strlen(input)); + } } flb_time_msleep(3000); /* Give more time for processing large volume */ @@ -989,13 +1011,13 @@ void flb_test_lookup_metrics_processed(void) TEST_CHECK(skipped == 0); if (processed != matching_count + non_matching_count) { - TEST_MSG("Expected processed=%d, got %" PRIu64, matching_count + non_matching_count, processed); + TEST_MSG("Expected processed=%d, got %llu", matching_count + non_matching_count, (unsigned long long)processed); } if (matched != matching_count) { - TEST_MSG("Expected matched=%d, got %" PRIu64, matching_count, matched); + TEST_MSG("Expected matched=%d, got %llu", matching_count, (unsigned long long)matched); } if (skipped != 0) { - TEST_MSG("Expected skipped=0, got %" PRIu64, skipped); + TEST_MSG("Expected skipped=0, got %llu", (unsigned long long)skipped); } delete_csv_file();