From e7c5ff56f1fd605bb46701cfe2aaf0fb5eb19759 Mon Sep 17 00:00:00 2001 From: Zhihong Lin Date: Fri, 11 Jul 2025 17:35:41 -0400 Subject: [PATCH 1/6] out_cloudwatch: add entity support and remove unnecessary log content Signed-off-by: Zhihong Lin --- plugins/out_cloudwatch_logs/cloudwatch_api.c | 438 +++++++++++++++++- plugins/out_cloudwatch_logs/cloudwatch_api.h | 10 + plugins/out_cloudwatch_logs/cloudwatch_logs.c | 39 ++ plugins/out_cloudwatch_logs/cloudwatch_logs.h | 48 ++ 4 files changed, 534 insertions(+), 1 deletion(-) diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.c b/plugins/out_cloudwatch_logs/cloudwatch_api.c index fe24f3937d5..16123e634d6 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.c @@ -199,12 +199,153 @@ static inline int try_to_write(char *buf, int *off, size_t left, return FLB_TRUE; } +static int entity_add_key_attributes(struct flb_cloudwatch *ctx, struct cw_flush *buf, struct log_stream *stream, int *offset) { + char ts[KEY_ATTRIBUTES_MAX_LEN]; + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "\"keyAttributes\":{",0)) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "\"Type\":\"Service\"",0)) { + goto error; + } + if(stream->entity->key_attributes->name != NULL && strlen(stream->entity->key_attributes->name) != 0) { + if (!snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s","\"Name\":\"",stream->entity->key_attributes->name,"\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if(stream->entity->key_attributes->environment != NULL && strlen(stream->entity->key_attributes->environment) != 0) { + if (!snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s","\"Environment\":\"",stream->entity->key_attributes->environment,"\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if(stream->entity->key_attributes->account_id != NULL && strlen(stream->entity->key_attributes->account_id) != 0) { + if (!snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s","\"AwsAccountId\":\"",stream->entity->key_attributes->account_id,"\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "},", 2)) { + goto error; + } + return 0; +error: + return -1; +} + +static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *buf, struct log_stream *stream,int *offset) { + char ts[ATTRIBUTES_MAX_LEN]; + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "\"attributes\":{", + 0)) { + goto error; + } + if (stream->entity->attributes->platform_type != NULL && strlen(stream->entity->attributes->platform_type) != 0) { + if (strcmp(stream->entity->attributes->platform_type, "eks") == 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, "%s%s%s","\"PlatformType\":\"","AWS::EKS","\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + if(stream->entity->attributes->cluster_name != NULL && strlen(stream->entity->attributes->cluster_name) != 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s","\"EKS.Cluster\":\"",stream->entity->attributes->cluster_name,"\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + } else if (strcmp(stream->entity->attributes->platform_type, "k8s") == 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, "%s%s%s","\"PlatformType\":\"","K8s","\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + if(stream->entity->attributes->cluster_name != NULL && strlen(stream->entity->attributes->cluster_name) != 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s","\"K8s.Cluster\":\"",stream->entity->attributes->cluster_name,"\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + } + } else { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, "%s%s%s","\"PlatformType\":\"","Generic","\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if(stream->entity->attributes->namespace != NULL && strlen(stream->entity->attributes->namespace) != 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s","\"K8s.Namespace\":\"",stream->entity->attributes->namespace,"\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if(stream->entity->attributes->node != NULL && strlen(stream->entity->attributes->node) != 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s","\"K8s.Node\":\"",buf->current_stream->entity->attributes->node,"\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if(stream->entity->attributes->workload != NULL && strlen(stream->entity->attributes->workload) != 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s","\"K8s.Workload\":\"",buf->current_stream->entity->attributes->workload,"\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if(stream->entity->attributes->instance_id != NULL && strlen(stream->entity->attributes->instance_id) != 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s","\"EC2.InstanceId\":\"",buf->current_stream->entity->attributes->instance_id,"\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if(stream->entity->attributes->name_source != NULL && strlen(stream->entity->attributes->name_source) != 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s","\"AWS.ServiceNameSource\":\"",buf->current_stream->entity->attributes->name_source,"\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "}", 1)) { + goto error; + } + return 0; +error: + return -1; +} + /* * Writes the "header" for a put log events payload */ static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf, struct log_stream *stream, int *offset) { + int ret; if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, "{\"logGroupName\":\"", 17)) { goto error; @@ -229,6 +370,35 @@ static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf, "\",", 2)) { goto error; } + // If we are missing the service name, the entity will get rejected by the frontend anyway + // so do not emit entity unless service name is filled. If we are missing account ID + // it is considered not having sufficient information for entity therefore we should drop the entity. + if(ctx->add_entity && stream->entity != NULL && stream->entity->key_attributes != NULL && stream->entity->key_attributes->name != NULL && stream->entity->key_attributes->account_id != NULL) { + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "\"entity\":{", 10)) { + goto error; + } + + if(stream->entity->key_attributes != NULL) { + ret = entity_add_key_attributes(ctx,buf,stream,offset); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to initialize Entity KeyAttributes"); + goto error; + } + } + if(stream->entity->attributes != NULL) { + ret = entity_add_attributes(ctx,buf,stream,offset); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to initialize Entity Attributes"); + goto error; + } + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "},", 2)) { + goto error; + } + + } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, "\"logEvents\":[", 13)) { @@ -366,6 +536,69 @@ static int truncate_log(const struct flb_cloudwatch *ctx, const char *log_buffer return FLB_FALSE; } +/* + * Helper function to remove keys prefixed with aws_entity + * from a message pack map + */ +void remove_key_from_nested_map(msgpack_object_map *nested_map, msgpack_packer *pk, int filtered_fields) { + const int remaining_kv_pairs = nested_map->size - filtered_fields; + uint32_t j; + + // Pack the updated nested map into the packer, skipping keys in the remove list + msgpack_pack_map(pk, remaining_kv_pairs); // Initial size will adjust in the next loop + + for (j = 0; j < nested_map->size; j++) { + msgpack_object_kv nested_kv = nested_map->ptr[j]; + + // Check if the current key is in the removal list + if (nested_kv.key.type == MSGPACK_OBJECT_STR && nested_kv.key.via.str.size > AWS_ENTITY_PREFIX_LEN && strncmp(nested_kv.key.via.str.ptr, AWS_ENTITY_PREFIX, AWS_ENTITY_PREFIX_LEN) == 0) { + // Skip the key in the remove list + continue; + } + + // Pack the remaining key-value pairs into the packer + msgpack_pack_object(pk, nested_kv.key); + msgpack_pack_object(pk, nested_kv.val); + } +} + +/* + * Main function to remove keys prefixed with aws_entity + * from the root and nested message pack map + */ +void remove_unneeded_field(msgpack_object *root_map, const char *nested_map_key, msgpack_packer *pk,int root_filtered_fields, int filtered_fields) { + uint32_t i; + + if (root_map->type == MSGPACK_OBJECT_MAP) { + msgpack_object_map root = root_map->via.map; + + // Prepare to pack the modified root map (size may be unchanged or reduced) + msgpack_pack_map(pk, root.size-root_filtered_fields); + + for (i = 0; i < root.size; i++) { + msgpack_object_kv root_kv = root.ptr[i]; + + // Check if this key matches the nested map key (e.g., "kubernetes") + if (filtered_fields > 0 && + root_kv.key.type == MSGPACK_OBJECT_STR && + strncmp(root_kv.key.via.str.ptr, nested_map_key, root_kv.key.via.str.size) == 0 && + root_kv.val.type == MSGPACK_OBJECT_MAP) { + + // Pack the nested map key + msgpack_pack_object(pk, root_kv.key); + + // Remove the unneeded key from the nested map + remove_key_from_nested_map(&root_kv.val.via.map, pk,filtered_fields); + } else if (root_filtered_fields > 0 && root_kv.key.type == MSGPACK_OBJECT_STR && root_kv.key.via.str.size > AWS_ENTITY_PREFIX_LEN && strncmp(root_kv.key.via.str.ptr, AWS_ENTITY_PREFIX, AWS_ENTITY_PREFIX_LEN) == 0) { + } else { + // Pack other key-value pairs unchanged + msgpack_pack_object(pk, root_kv.key); + msgpack_pack_object(pk, root_kv.val); + } + } + } +} + /* * Processes the msgpack object @@ -786,6 +1019,175 @@ int pack_emf_payload(struct flb_cloudwatch *ctx, return 0; } +static char* find_fallback_environment(struct flb_cloudwatch *ctx, entity *entity) { + if(!ctx->add_entity || entity == NULL) { + return NULL; + } + char *fallback_env = NULL; + int ret; + /* + * Possible fallback environments: + * 1. eks:cluster-name/namespace + * 2. k8s:cluster-name/namespace + */ + if (entity->attributes->platform_type != NULL && entity->attributes->cluster_name != NULL && entity->attributes->namespace != NULL) { + /* Calculate required length */ + /* Add 3 for ':' '/' and null terminator */ + size_t len = strlen(entity->attributes->platform_type) + + strlen(entity->attributes->cluster_name) + + strlen(entity->attributes->namespace) + 3; + + fallback_env = flb_malloc(len); + if (!fallback_env) { + return NULL; + } + + /* Use snprintf for cross-platform compatibility */ + ret = snprintf(fallback_env, len, "%s:%s/%s", entity->attributes->platform_type, entity->attributes->cluster_name, entity->attributes->namespace); + if (ret < 0 || ret >= len) { + flb_free(fallback_env); + return NULL; + } + + return fallback_env; + } + return NULL; +} + +void parse_entity(struct flb_cloudwatch *ctx, entity *entity, msgpack_object map, int map_size) { + int i,j; + msgpack_object key, kube_key; + msgpack_object val, kube_val; + + int val_map_size; + for(i=0; i < map_size; i++) { + key = map.via.map.ptr[i].key; + val = map.via.map.ptr[i].val; + if(strncmp(key.via.str.ptr, "kubernetes",10 ) == 0 ) { + if (val.type == MSGPACK_OBJECT_MAP) { + val_map_size = val.via.map.size; + for (j=0; j < val_map_size; j++) { + kube_key = val.via.map.ptr[j].key; + kube_val = val.via.map.ptr[j].val; + if(strncmp(kube_key.via.str.ptr, "aws_entity_service_name", kube_key.via.str.size) == 0) { + if(!entity->service_name_found) { + entity->filter_count++; + entity->service_name_found++; + } + if(entity->key_attributes->name != NULL) { + flb_free(entity->key_attributes->name); + } + entity->key_attributes->name = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); + } else if(strncmp(kube_key.via.str.ptr, "aws_entity_environment", kube_key.via.str.size) == 0) { + if(!entity->environment_found) { + entity->filter_count++; + entity->environment_found++; + } + if(entity->key_attributes->environment != NULL) { + flb_free(entity->key_attributes->environment); + } + entity->key_attributes->environment = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); + } else if(strncmp(kube_key.via.str.ptr, "namespace_name", kube_key.via.str.size) == 0) { + if(entity->attributes->namespace != NULL) { + flb_free(entity->attributes->namespace); + } + entity->attributes->namespace = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); + } else if(strncmp(kube_key.via.str.ptr, "host", kube_key.via.str.size) == 0) { + if(entity->attributes->node != NULL) { + flb_free(entity->attributes->node); + } + entity->attributes->node = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); + } else if(strncmp(kube_key.via.str.ptr, "aws_entity_cluster", kube_key.via.str.size) == 0) { + if(entity->attributes->cluster_name == NULL) { + entity->filter_count++; + } else { + flb_free(entity->attributes->cluster_name); + } + entity->attributes->cluster_name = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); + } else if(strncmp(kube_key.via.str.ptr, "aws_entity_workload", kube_key.via.str.size) == 0) { + if(entity->attributes->workload == NULL) { + entity->filter_count++; + } else { + flb_free(entity->attributes->workload); + } + entity->attributes->workload = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); + } else if(strncmp(kube_key.via.str.ptr, "aws_entity_name_source", kube_key.via.str.size) == 0) { + if(!entity->name_source_found) { + entity->filter_count++; + entity->name_source_found++; + } + if(entity->attributes->name_source != NULL) { + flb_free(entity->attributes->name_source); + } + entity->attributes->name_source = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); + } else if(strncmp(kube_key.via.str.ptr, "aws_entity_platform", kube_key.via.str.size) == 0) { + if(entity->attributes->platform_type == NULL) { + entity->filter_count++; + } else { + flb_free(entity->attributes->platform_type); + } + entity->attributes->platform_type = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); + } + } + } + } + if(strncmp(key.via.str.ptr, "aws_entity_ec2_instance_id",key.via.str.size ) == 0 ) { + if(entity->attributes->instance_id == NULL) { + entity->root_filter_count++; + } else { + flb_free(entity->attributes->instance_id); + } + entity->attributes->instance_id = flb_strndup(val.via.str.ptr, val.via.str.size); + } + if(strncmp(key.via.str.ptr, "aws_entity_account_id",key.via.str.size ) == 0 ) { + if(entity->key_attributes->account_id == NULL) { + entity->root_filter_count++; + } else { + flb_free(entity->key_attributes->account_id); + } + entity->key_attributes->account_id = flb_strndup(val.via.str.ptr, val.via.str.size); + } + } + if(entity->key_attributes->name == NULL && entity->attributes->name_source == NULL &&entity->attributes->workload != NULL) { + entity->key_attributes->name = flb_strndup(entity->attributes->workload, strlen(entity->attributes->workload)); + entity->attributes->name_source = flb_strndup("K8sWorkload", 11); + } + if(entity->key_attributes->environment == NULL) { + entity->key_attributes->environment = find_fallback_environment(ctx, entity); + } +} + +void update_or_create_entity(struct flb_cloudwatch *ctx, struct log_stream *stream, const msgpack_object map) { + if(stream->entity == NULL) { + stream->entity = flb_malloc(sizeof(entity)); + if (stream->entity == NULL) { + return; + } + memset(stream->entity, 0, sizeof(entity)); + + stream->entity->key_attributes = flb_malloc(sizeof(entity_key_attributes)); + if (stream->entity->key_attributes == NULL) { + return; + } + memset(stream->entity->key_attributes, 0, sizeof(entity_key_attributes)); + + stream->entity->attributes = flb_malloc(sizeof(entity_attributes)); + if (stream->entity->attributes == NULL) { + return; + } + memset(stream->entity->attributes, 0, sizeof(entity_attributes)); + stream->entity->filter_count = 0; + stream->entity->root_filter_count = 0; + stream->entity->service_name_found = 0; + stream->entity->environment_found = 0; + stream->entity->name_source_found = 0; + } + parse_entity(ctx,stream->entity,map, map.via.map.size); + if (!stream->entity) { + flb_plg_warn(ctx->ins, "Failed to generate entity"); + } +} + static int process_log_events(struct flb_cloudwatch *ctx, const char *input_plugin, struct cw_flush *buf, flb_sds_t tag, const char *data, size_t bytes) @@ -800,6 +1202,12 @@ static int process_log_events(struct flb_cloudwatch *ctx, const char *input_plug msgpack_object emf_payload; /* msgpack::sbuffer is a simple buffer implementation. */ msgpack_sbuffer mp_sbuf; + /* + * Msgpack objects used to store msgpack after filtering out fields + * with aws entity prefix + */ + msgpack_sbuffer filtered_sbuf; + msgpack_unpacked modified_unpacked; struct log_stream *stream; @@ -848,11 +1256,30 @@ static int process_log_events(struct flb_cloudwatch *ctx, const char *input_plug map = *log_event.body; map_size = map.via.map.size; + if(ctx->kubernete_metadata_enabled && ctx->add_entity) { + msgpack_sbuffer_init(&filtered_sbuf); + msgpack_unpacked_init(&modified_unpacked); + } stream = get_log_stream(ctx, tag, map); if (!stream) { flb_plg_debug(ctx->ins, "Couldn't determine log group & stream for record with tag %s", tag); goto error; } + if(ctx->kubernete_metadata_enabled && ctx->add_entity) { + update_or_create_entity(ctx,stream,map); + // Prepare a buffer to pack the modified map + if(stream->entity != NULL && (stream->entity->root_filter_count > 0 || stream->entity->filter_count > 0)) { + msgpack_packer pk; + msgpack_packer_init(&pk, &filtered_sbuf, msgpack_sbuffer_write); + remove_unneeded_field(&map, "kubernetes",&pk,stream->entity->root_filter_count, stream->entity->filter_count); + + // Now, unpack the modified data into a new msgpack_object + size_t modified_offset = 0; + if (msgpack_unpack_next(&modified_unpacked, filtered_sbuf.data, filtered_sbuf.size, &modified_offset)) { + map = modified_unpacked.data; + } + } + } if (ctx->log_key) { key_str = NULL; @@ -974,6 +1401,10 @@ static int process_log_events(struct flb_cloudwatch *ctx, const char *input_plug if (ret == 0) { i++; } + if(ctx->kubernete_metadata_enabled && ctx->add_entity) { + msgpack_sbuffer_destroy(&filtered_sbuf); + msgpack_unpacked_destroy(&modified_unpacked); + } } flb_log_event_decoder_destroy(&log_decoder); @@ -981,7 +1412,10 @@ static int process_log_events(struct flb_cloudwatch *ctx, const char *input_plug error: flb_log_event_decoder_destroy(&log_decoder); - + if(ctx->kubernete_metadata_enabled && ctx->add_entity) { + msgpack_sbuffer_destroy(&filtered_sbuf); + msgpack_unpacked_destroy(&modified_unpacked); + } return -1; } @@ -1537,6 +1971,8 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf, if (c) { flb_plg_debug(ctx->ins, "PutLogEvents http status=%d", c->resp.status); + flb_plg_debug(ctx->ins, "PutLogEvents http data=%s", c->resp.data); + flb_plg_debug(ctx->ins, "PutLogEvents http payload=%s", c->resp.payload); if (c->resp.status == 200) { if (c->resp.data == NULL || c->resp.data_len == 0 || strcasestr(c->resp.data, AMZN_REQUEST_ID_HEADER) == NULL) { diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.h b/plugins/out_cloudwatch_logs/cloudwatch_api.h index 05abfff30a1..280b60bd485 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.h +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.h @@ -35,9 +35,19 @@ /* number of characters needed to 'end' a PutLogEvents payload */ #define PUT_LOG_EVENTS_FOOTER_LEN 4 +// https://docs.aws.amazon.com/applicationsignals/latest/APIReference/API_Service.html +/* Maxinum number of character limits including both the KeyAttributes key and its value */ +#define KEY_ATTRIBUTES_MAX_LEN 1100 +/* Maxinum number of character limits including both the Attributes key and its value */ +#define ATTRIBUTES_MAX_LEN 300 + /* 256KiB minus 26 bytes for the event */ #define MAX_EVENT_LEN 262118 +/* Prefix used for entity fields only */ +#define AWS_ENTITY_PREFIX "aws_entity" +#define AWS_ENTITY_PREFIX_LEN 10 + #include "cloudwatch_logs.h" void cw_flush_destroy(struct cw_flush *buf); diff --git a/plugins/out_cloudwatch_logs/cloudwatch_logs.c b/plugins/out_cloudwatch_logs/cloudwatch_logs.c index c5e808ae141..02bcfa681e2 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_logs.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_logs.c @@ -378,6 +378,15 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins, flb_output_upstream_set(upstream, ctx->ins); ctx->cw_client->host = ctx->endpoint; + struct mk_list *head; + struct flb_filter_instance *f_ins; + mk_list_foreach(head, &config->filters) { + f_ins = mk_list_entry(head, struct flb_filter_instance, _head); + if (strstr(f_ins->p->name, "kubernetes")) { + ctx->kubernete_metadata_enabled = true; + } + } + /* Export context */ flb_output_set_context(ins, ctx); @@ -530,6 +539,27 @@ static int cb_cloudwatch_exit(void *data, struct flb_config *config) return 0; } +void entity_destroy(entity *entity) { + if(entity->attributes) { + flb_free(entity->attributes->cluster_name); + flb_free(entity->attributes->instance_id); + flb_free(entity->attributes->namespace); + flb_free(entity->attributes->node); + flb_free(entity->attributes->platform_type); + flb_free(entity->attributes->workload); + flb_free(entity->attributes->name_source); + flb_free(entity->attributes); + } + if(entity->key_attributes) { + flb_free(entity->key_attributes->environment); + flb_free(entity->key_attributes->name); + flb_free(entity->key_attributes->type); + flb_free(entity->key_attributes->account_id); + flb_free(entity->key_attributes); + } + flb_free(entity); +} + void log_stream_destroy(struct log_stream *stream) { if (stream) { @@ -539,6 +569,9 @@ void log_stream_destroy(struct log_stream *stream) if (stream->group) { flb_sds_destroy(stream->group); } + if (stream->entity) { + entity_destroy(stream->entity); + } flb_free(stream); } } @@ -689,6 +722,12 @@ static struct flb_config_map config_map[] = { "Specify the log storage class. Valid values are STANDARD (default) and INFREQUENT_ACCESS." }, + { + FLB_CONFIG_MAP_BOOL, "add_entity", "false", + 0, FLB_TRUE, offsetof(struct flb_cloudwatch, add_entity), + "add entity to PutLogEvent calls" + }, + /* EOF */ {0} }; diff --git a/plugins/out_cloudwatch_logs/cloudwatch_logs.h b/plugins/out_cloudwatch_logs/cloudwatch_logs.h index 3724863426a..1971cffa2b4 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_logs.h +++ b/plugins/out_cloudwatch_logs/cloudwatch_logs.h @@ -30,6 +30,39 @@ #include #include +/* Entity object used for associating the telemetry + * in the PutLogEvent call*/ +typedef struct entity { + struct entity_key_attributes *key_attributes; + struct entity_attributes *attributes; + int filter_count; + int service_name_found; + int environment_found; + int name_source_found; + int root_filter_count; +}entity; + +/* KeyAttributes used for CloudWatch Entity object + * in the PutLogEvent call*/ +typedef struct entity_key_attributes { + char *type; + char *name; + char *environment; + char *account_id; +}entity_key_attributes; + +/* Attributes used for CloudWatch Entity object + * in the PutLogEvent call*/ +typedef struct entity_attributes { + char *platform_type; + char *cluster_name; + char *namespace; + char *workload; + char *node; + char *instance_id; + char *name_source; +}entity_attributes; + #define LOG_CLASS_STANDARD "STANDARD" #define LOG_CLASS_STANDARD_LEN 8 #define LOG_CLASS_INFREQUENT_ACCESS "INFREQUENT_ACCESS" @@ -94,6 +127,13 @@ struct log_stream { unsigned long long oldest_event; unsigned long long newest_event; + /* + * PutLogEvents entity object + * variable that store service or infrastructure + * information + */ + struct entity *entity; + struct mk_list _head; }; @@ -159,6 +199,14 @@ struct flb_cloudwatch { /* Plugin output instance reference */ struct flb_output_instance *ins; + + /* Checks if kubernete filter is enabled + * So the plugin knows when to scrape for Entity + */ + + int kubernete_metadata_enabled; + + int add_entity; }; void flb_cloudwatch_ctx_destroy(struct flb_cloudwatch *ctx); From aeee49c43f74afe2345da0523dee7dbd4f7d4fba Mon Sep 17 00:00:00 2001 From: Zhihong Lin Date: Tue, 29 Jul 2025 14:23:59 -0400 Subject: [PATCH 2/6] out_cloudwatch: fix code styles to follow upstream standard Signed-off-by: Zhihong Lin --- plugins/out_cloudwatch_logs/cloudwatch_api.c | 266 ++++++++++++------ plugins/out_cloudwatch_logs/cloudwatch_api.h | 8 +- plugins/out_cloudwatch_logs/cloudwatch_logs.c | 3 +- plugins/out_cloudwatch_logs/cloudwatch_logs.h | 21 +- 4 files changed, 201 insertions(+), 97 deletions(-) diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.c b/plugins/out_cloudwatch_logs/cloudwatch_api.c index 16123e634d6..f97bf21e93b 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.c @@ -199,7 +199,9 @@ static inline int try_to_write(char *buf, int *off, size_t left, return FLB_TRUE; } -static int entity_add_key_attributes(struct flb_cloudwatch *ctx, struct cw_flush *buf, struct log_stream *stream, int *offset) { +static int entity_add_key_attributes(struct flb_cloudwatch *ctx, struct cw_flush *buf, + struct log_stream *stream, int *offset) +{ char ts[KEY_ATTRIBUTES_MAX_LEN]; if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, "\"keyAttributes\":{",0)) { @@ -209,24 +211,30 @@ static int entity_add_key_attributes(struct flb_cloudwatch *ctx, struct cw_flush "\"Type\":\"Service\"",0)) { goto error; } - if(stream->entity->key_attributes->name != NULL && strlen(stream->entity->key_attributes->name) != 0) { - if (!snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s","\"Name\":\"",stream->entity->key_attributes->name,"\"")) { + if(stream->entity->key_attributes->name != NULL && + strlen(stream->entity->key_attributes->name) != 0) { + if (!snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"Name\":\"",stream->entity->key_attributes->name,"\"")) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { goto error; } } - if(stream->entity->key_attributes->environment != NULL && strlen(stream->entity->key_attributes->environment) != 0) { - if (!snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s","\"Environment\":\"",stream->entity->key_attributes->environment,"\"")) { + if(stream->entity->key_attributes->environment != NULL && + strlen(stream->entity->key_attributes->environment) != 0) { + if (!snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"Environment\":\"",stream->entity->key_attributes->environment,"\"")) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { goto error; } } - if(stream->entity->key_attributes->account_id != NULL && strlen(stream->entity->key_attributes->account_id) != 0) { - if (!snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s","\"AwsAccountId\":\"",stream->entity->key_attributes->account_id,"\"")) { + if(stream->entity->key_attributes->account_id != NULL && + strlen(stream->entity->key_attributes->account_id) != 0) { + if (!snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"AwsAccountId\":\"",stream->entity->key_attributes->account_id,"\"")) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { @@ -242,38 +250,48 @@ static int entity_add_key_attributes(struct flb_cloudwatch *ctx, struct cw_flush return -1; } -static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *buf, struct log_stream *stream,int *offset) { +static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *buf, + struct log_stream *stream,int *offset) +{ char ts[ATTRIBUTES_MAX_LEN]; if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, "\"attributes\":{", 0)) { goto error; } - if (stream->entity->attributes->platform_type != NULL && strlen(stream->entity->attributes->platform_type) != 0) { + if (stream->entity->attributes->platform_type != NULL && + strlen(stream->entity->attributes->platform_type) != 0) { if (strcmp(stream->entity->attributes->platform_type, "eks") == 0) { - if (!snprintf(ts,ATTRIBUTES_MAX_LEN, "%s%s%s","\"PlatformType\":\"","AWS::EKS","\"")) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, "%s%s%s", + "\"PlatformType\":\"","AWS::EKS","\"")) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { goto error; } - if(stream->entity->attributes->cluster_name != NULL && strlen(stream->entity->attributes->cluster_name) != 0) { - if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s","\"EKS.Cluster\":\"",stream->entity->attributes->cluster_name,"\"")) { + if(stream->entity->attributes->cluster_name != NULL && + strlen(stream->entity->attributes->cluster_name) != 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"EKS.Cluster\":\"",stream->entity->attributes->cluster_name,"\"")) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { goto error; } } - } else if (strcmp(stream->entity->attributes->platform_type, "k8s") == 0) { - if (!snprintf(ts,ATTRIBUTES_MAX_LEN, "%s%s%s","\"PlatformType\":\"","K8s","\"")) { + } + else if (strcmp(stream->entity->attributes->platform_type, "k8s") == 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, "%s%s%s", + "\"PlatformType\":\"","K8s","\"")) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { goto error; } - if(stream->entity->attributes->cluster_name != NULL && strlen(stream->entity->attributes->cluster_name) != 0) { - if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s","\"K8s.Cluster\":\"",stream->entity->attributes->cluster_name,"\"")) { + if(stream->entity->attributes->cluster_name != NULL && + strlen(stream->entity->attributes->cluster_name) != 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"K8s.Cluster\":\"",stream->entity->attributes->cluster_name,"\"")) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { @@ -281,48 +299,60 @@ static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *bu } } } - } else { - if (!snprintf(ts,ATTRIBUTES_MAX_LEN, "%s%s%s","\"PlatformType\":\"","Generic","\"")) { + } + else { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, "%s%s%s", + "\"PlatformType\":\"","Generic","\"")) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { goto error; } } - if(stream->entity->attributes->namespace != NULL && strlen(stream->entity->attributes->namespace) != 0) { - if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s","\"K8s.Namespace\":\"",stream->entity->attributes->namespace,"\"")) { + if(stream->entity->attributes->namespace != NULL && + strlen(stream->entity->attributes->namespace) != 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"K8s.Namespace\":\"",stream->entity->attributes->namespace,"\"")) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { goto error; } } - if(stream->entity->attributes->node != NULL && strlen(stream->entity->attributes->node) != 0) { - if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s","\"K8s.Node\":\"",buf->current_stream->entity->attributes->node,"\"")) { + if(stream->entity->attributes->node != NULL && + strlen(stream->entity->attributes->node) != 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"K8s.Node\":\"",buf->current_stream->entity->attributes->node,"\"")) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { goto error; } } - if(stream->entity->attributes->workload != NULL && strlen(stream->entity->attributes->workload) != 0) { - if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s","\"K8s.Workload\":\"",buf->current_stream->entity->attributes->workload,"\"")) { + if(stream->entity->attributes->workload != NULL && + strlen(stream->entity->attributes->workload) != 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"K8s.Workload\":\"",buf->current_stream->entity->attributes->workload,"\"")) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { goto error; } } - if(stream->entity->attributes->instance_id != NULL && strlen(stream->entity->attributes->instance_id) != 0) { - if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s","\"EC2.InstanceId\":\"",buf->current_stream->entity->attributes->instance_id,"\"")) { + if(stream->entity->attributes->instance_id != NULL && + strlen(stream->entity->attributes->instance_id) != 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"EC2.InstanceId\":\"",buf->current_stream->entity->attributes->instance_id,"\"")) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { goto error; } } - if(stream->entity->attributes->name_source != NULL && strlen(stream->entity->attributes->name_source) != 0) { - if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s","\"AWS.ServiceNameSource\":\"",buf->current_stream->entity->attributes->name_source,"\"")) { + if(stream->entity->attributes->name_source != NULL && + strlen(stream->entity->attributes->name_source) != 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"AWS.ServiceNameSource\":\"",buf->current_stream->entity->attributes->name_source,"\"")) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { @@ -370,10 +400,16 @@ static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf, "\",", 2)) { goto error; } - // If we are missing the service name, the entity will get rejected by the frontend anyway - // so do not emit entity unless service name is filled. If we are missing account ID - // it is considered not having sufficient information for entity therefore we should drop the entity. - if(ctx->add_entity && stream->entity != NULL && stream->entity->key_attributes != NULL && stream->entity->key_attributes->name != NULL && stream->entity->key_attributes->account_id != NULL) { + /* + * If we are missing the service name, the entity will get rejected by the frontend + * anyway so do not emit entity unless service name is filled. If we are missing + * account ID, it is considered not having sufficient information for entity + * therefore we should drop the entity. + */ + if(ctx->add_entity && stream->entity != NULL && + stream->entity->key_attributes != NULL && + stream->entity->key_attributes->name != NULL && + stream->entity->key_attributes->account_id != NULL) { if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, "\"entity\":{", 10)) { goto error; @@ -540,23 +576,28 @@ static int truncate_log(const struct flb_cloudwatch *ctx, const char *log_buffer * Helper function to remove keys prefixed with aws_entity * from a message pack map */ -void remove_key_from_nested_map(msgpack_object_map *nested_map, msgpack_packer *pk, int filtered_fields) { +void remove_key_from_nested_map(msgpack_object_map *nested_map, msgpack_packer *pk, + int filtered_fields) +{ const int remaining_kv_pairs = nested_map->size - filtered_fields; uint32_t j; - // Pack the updated nested map into the packer, skipping keys in the remove list - msgpack_pack_map(pk, remaining_kv_pairs); // Initial size will adjust in the next loop + /* Pack the updated nested map into the packer, skipping keys in the remove list */ + msgpack_pack_map(pk, remaining_kv_pairs); for (j = 0; j < nested_map->size; j++) { msgpack_object_kv nested_kv = nested_map->ptr[j]; - // Check if the current key is in the removal list - if (nested_kv.key.type == MSGPACK_OBJECT_STR && nested_kv.key.via.str.size > AWS_ENTITY_PREFIX_LEN && strncmp(nested_kv.key.via.str.ptr, AWS_ENTITY_PREFIX, AWS_ENTITY_PREFIX_LEN) == 0) { - // Skip the key in the remove list + /* Check if the current key is in the removal list */ + if (nested_kv.key.type == MSGPACK_OBJECT_STR && + nested_kv.key.via.str.size > AWS_ENTITY_PREFIX_LEN && + strncmp(nested_kv.key.via.str.ptr, + AWS_ENTITY_PREFIX, AWS_ENTITY_PREFIX_LEN) == 0) { + /* Skip the key in the remove list */ continue; } - // Pack the remaining key-value pairs into the packer + /* Pack the remaining key-value pairs into the packer */ msgpack_pack_object(pk, nested_kv.key); msgpack_pack_object(pk, nested_kv.val); } @@ -566,32 +607,38 @@ void remove_key_from_nested_map(msgpack_object_map *nested_map, msgpack_packer * * Main function to remove keys prefixed with aws_entity * from the root and nested message pack map */ -void remove_unneeded_field(msgpack_object *root_map, const char *nested_map_key, msgpack_packer *pk,int root_filtered_fields, int filtered_fields) { +void remove_unneeded_field(msgpack_object *root_map, const char *nested_map_key, + msgpack_packer *pk,int root_filtered_fields, int filtered_fields) +{ uint32_t i; if (root_map->type == MSGPACK_OBJECT_MAP) { msgpack_object_map root = root_map->via.map; - // Prepare to pack the modified root map (size may be unchanged or reduced) + /* Prepare to pack the modified root map (size may be unchanged or reduced) */ msgpack_pack_map(pk, root.size-root_filtered_fields); for (i = 0; i < root.size; i++) { msgpack_object_kv root_kv = root.ptr[i]; - // Check if this key matches the nested map key (e.g., "kubernetes") + /* Check if this key matches the nested map key (e.g., "kubernetes") */ if (filtered_fields > 0 && root_kv.key.type == MSGPACK_OBJECT_STR && - strncmp(root_kv.key.via.str.ptr, nested_map_key, root_kv.key.via.str.size) == 0 && + strncmp(root_kv.key.via.str.ptr, + nested_map_key, root_kv.key.via.str.size) == 0 && root_kv.val.type == MSGPACK_OBJECT_MAP) { - // Pack the nested map key msgpack_pack_object(pk, root_kv.key); - // Remove the unneeded key from the nested map remove_key_from_nested_map(&root_kv.val.via.map, pk,filtered_fields); - } else if (root_filtered_fields > 0 && root_kv.key.type == MSGPACK_OBJECT_STR && root_kv.key.via.str.size > AWS_ENTITY_PREFIX_LEN && strncmp(root_kv.key.via.str.ptr, AWS_ENTITY_PREFIX, AWS_ENTITY_PREFIX_LEN) == 0) { - } else { - // Pack other key-value pairs unchanged + } + else if (root_filtered_fields > 0 && + root_kv.key.type == MSGPACK_OBJECT_STR && + root_kv.key.via.str.size > AWS_ENTITY_PREFIX_LEN && + strncmp(root_kv.key.via.str.ptr, + AWS_ENTITY_PREFIX, AWS_ENTITY_PREFIX_LEN) == 0) { + } + else { msgpack_pack_object(pk, root_kv.key); msgpack_pack_object(pk, root_kv.val); } @@ -1019,7 +1066,8 @@ int pack_emf_payload(struct flb_cloudwatch *ctx, return 0; } -static char* find_fallback_environment(struct flb_cloudwatch *ctx, entity *entity) { +static char* find_fallback_environment(struct flb_cloudwatch *ctx, entity *entity) +{ if(!ctx->add_entity || entity == NULL) { return NULL; } @@ -1030,9 +1078,13 @@ static char* find_fallback_environment(struct flb_cloudwatch *ctx, entity *entit * 1. eks:cluster-name/namespace * 2. k8s:cluster-name/namespace */ - if (entity->attributes->platform_type != NULL && entity->attributes->cluster_name != NULL && entity->attributes->namespace != NULL) { - /* Calculate required length */ - /* Add 3 for ':' '/' and null terminator */ + if (entity->attributes->platform_type != NULL && + entity->attributes->cluster_name != NULL && + entity->attributes->namespace != NULL) { + /* + * Calculate required length + * Add 3 for ':' '/' and null terminator + */ size_t len = strlen(entity->attributes->platform_type) + strlen(entity->attributes->cluster_name) + strlen(entity->attributes->namespace) + 3; @@ -1043,7 +1095,9 @@ static char* find_fallback_environment(struct flb_cloudwatch *ctx, entity *entit } /* Use snprintf for cross-platform compatibility */ - ret = snprintf(fallback_env, len, "%s:%s/%s", entity->attributes->platform_type, entity->attributes->cluster_name, entity->attributes->namespace); + ret = snprintf(fallback_env, len, "%s:%s/%s", + entity->attributes->platform_type, entity->attributes->cluster_name, + entity->attributes->namespace); if (ret < 0 || ret >= len) { flb_free(fallback_env); return NULL; @@ -1054,7 +1108,9 @@ static char* find_fallback_environment(struct flb_cloudwatch *ctx, entity *entit return NULL; } -void parse_entity(struct flb_cloudwatch *ctx, entity *entity, msgpack_object map, int map_size) { +void parse_entity(struct flb_cloudwatch *ctx, entity *entity, + msgpack_object map, int map_size) +{ int i,j; msgpack_object key, kube_key; msgpack_object val, kube_val; @@ -1069,7 +1125,8 @@ void parse_entity(struct flb_cloudwatch *ctx, entity *entity, msgpack_object map for (j=0; j < val_map_size; j++) { kube_key = val.via.map.ptr[j].key; kube_val = val.via.map.ptr[j].val; - if(strncmp(kube_key.via.str.ptr, "aws_entity_service_name", kube_key.via.str.size) == 0) { + if(strncmp(kube_key.via.str.ptr, + "aws_entity_service_name", kube_key.via.str.size) == 0) { if(!entity->service_name_found) { entity->filter_count++; entity->service_name_found++; @@ -1077,8 +1134,11 @@ void parse_entity(struct flb_cloudwatch *ctx, entity *entity, msgpack_object map if(entity->key_attributes->name != NULL) { flb_free(entity->key_attributes->name); } - entity->key_attributes->name = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); - } else if(strncmp(kube_key.via.str.ptr, "aws_entity_environment", kube_key.via.str.size) == 0) { + entity->key_attributes->name = flb_strndup(kube_val.via.str.ptr, + kube_val.via.str.size); + } + else if(strncmp(kube_key.via.str.ptr, "aws_entity_environment", + kube_key.via.str.size) == 0) { if(!entity->environment_found) { entity->filter_count++; entity->environment_found++; @@ -1086,32 +1146,49 @@ void parse_entity(struct flb_cloudwatch *ctx, entity *entity, msgpack_object map if(entity->key_attributes->environment != NULL) { flb_free(entity->key_attributes->environment); } - entity->key_attributes->environment = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); - } else if(strncmp(kube_key.via.str.ptr, "namespace_name", kube_key.via.str.size) == 0) { + entity->key_attributes->environment = flb_strndup(kube_val.via.str.ptr, + kube_val.via.str.size); + } + else if(strncmp(kube_key.via.str.ptr, "namespace_name", + kube_key.via.str.size) == 0) { if(entity->attributes->namespace != NULL) { flb_free(entity->attributes->namespace); } - entity->attributes->namespace = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); - } else if(strncmp(kube_key.via.str.ptr, "host", kube_key.via.str.size) == 0) { + entity->attributes->namespace = flb_strndup(kube_val.via.str.ptr, + kube_val.via.str.size); + } + else if(strncmp(kube_key.via.str.ptr, "host", + kube_key.via.str.size) == 0) { if(entity->attributes->node != NULL) { flb_free(entity->attributes->node); } - entity->attributes->node = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); - } else if(strncmp(kube_key.via.str.ptr, "aws_entity_cluster", kube_key.via.str.size) == 0) { + entity->attributes->node = flb_strndup(kube_val.via.str.ptr, + kube_val.via.str.size); + } + else if(strncmp(kube_key.via.str.ptr, "aws_entity_cluster", + kube_key.via.str.size) == 0) { if(entity->attributes->cluster_name == NULL) { entity->filter_count++; - } else { + } + else { flb_free(entity->attributes->cluster_name); } - entity->attributes->cluster_name = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); - } else if(strncmp(kube_key.via.str.ptr, "aws_entity_workload", kube_key.via.str.size) == 0) { + entity->attributes->cluster_name = flb_strndup(kube_val.via.str.ptr, + kube_val.via.str.size); + } + else if(strncmp(kube_key.via.str.ptr, "aws_entity_workload", + kube_key.via.str.size) == 0) { if(entity->attributes->workload == NULL) { entity->filter_count++; - } else { + } + else { flb_free(entity->attributes->workload); } - entity->attributes->workload = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); - } else if(strncmp(kube_key.via.str.ptr, "aws_entity_name_source", kube_key.via.str.size) == 0) { + entity->attributes->workload = flb_strndup(kube_val.via.str.ptr, + kube_val.via.str.size); + } + else if(strncmp(kube_key.via.str.ptr, "aws_entity_name_source", + kube_key.via.str.size) == 0) { if(!entity->name_source_found) { entity->filter_count++; entity->name_source_found++; @@ -1119,37 +1196,50 @@ void parse_entity(struct flb_cloudwatch *ctx, entity *entity, msgpack_object map if(entity->attributes->name_source != NULL) { flb_free(entity->attributes->name_source); } - entity->attributes->name_source = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); - } else if(strncmp(kube_key.via.str.ptr, "aws_entity_platform", kube_key.via.str.size) == 0) { + entity->attributes->name_source = flb_strndup(kube_val.via.str.ptr, + kube_val.via.str.size); + } + else if(strncmp(kube_key.via.str.ptr, "aws_entity_platform", + kube_key.via.str.size) == 0) { if(entity->attributes->platform_type == NULL) { entity->filter_count++; - } else { + } + else { flb_free(entity->attributes->platform_type); } - entity->attributes->platform_type = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); + entity->attributes->platform_type = flb_strndup(kube_val.via.str.ptr, + kube_val.via.str.size); } } } } - if(strncmp(key.via.str.ptr, "aws_entity_ec2_instance_id",key.via.str.size ) == 0 ) { + if(strncmp(key.via.str.ptr, "aws_entity_ec2_instance_id", + key.via.str.size ) == 0 ) { if(entity->attributes->instance_id == NULL) { entity->root_filter_count++; - } else { + } + else { flb_free(entity->attributes->instance_id); } - entity->attributes->instance_id = flb_strndup(val.via.str.ptr, val.via.str.size); + entity->attributes->instance_id = flb_strndup(val.via.str.ptr, + val.via.str.size); } if(strncmp(key.via.str.ptr, "aws_entity_account_id",key.via.str.size ) == 0 ) { if(entity->key_attributes->account_id == NULL) { entity->root_filter_count++; - } else { + } + else { flb_free(entity->key_attributes->account_id); } - entity->key_attributes->account_id = flb_strndup(val.via.str.ptr, val.via.str.size); + entity->key_attributes->account_id = flb_strndup(val.via.str.ptr, + val.via.str.size); } } - if(entity->key_attributes->name == NULL && entity->attributes->name_source == NULL &&entity->attributes->workload != NULL) { - entity->key_attributes->name = flb_strndup(entity->attributes->workload, strlen(entity->attributes->workload)); + if(entity->key_attributes->name == NULL && + entity->attributes->name_source == NULL && + entity->attributes->workload != NULL) { + entity->key_attributes->name = flb_strndup(entity->attributes->workload, + strlen(entity->attributes->workload)); entity->attributes->name_source = flb_strndup("K8sWorkload", 11); } if(entity->key_attributes->environment == NULL) { @@ -1157,7 +1247,9 @@ void parse_entity(struct flb_cloudwatch *ctx, entity *entity, msgpack_object map } } -void update_or_create_entity(struct flb_cloudwatch *ctx, struct log_stream *stream, const msgpack_object map) { +void update_or_create_entity(struct flb_cloudwatch *ctx, struct log_stream *stream, + const msgpack_object map) +{ if(stream->entity == NULL) { stream->entity = flb_malloc(sizeof(entity)); if (stream->entity == NULL) { @@ -1267,15 +1359,17 @@ static int process_log_events(struct flb_cloudwatch *ctx, const char *input_plug } if(ctx->kubernete_metadata_enabled && ctx->add_entity) { update_or_create_entity(ctx,stream,map); - // Prepare a buffer to pack the modified map - if(stream->entity != NULL && (stream->entity->root_filter_count > 0 || stream->entity->filter_count > 0)) { + if(stream->entity != NULL && + (stream->entity->root_filter_count > 0 || + stream->entity->filter_count > 0)) { msgpack_packer pk; msgpack_packer_init(&pk, &filtered_sbuf, msgpack_sbuffer_write); - remove_unneeded_field(&map, "kubernetes",&pk,stream->entity->root_filter_count, stream->entity->filter_count); + remove_unneeded_field(&map, "kubernetes",&pk, + stream->entity->root_filter_count, stream->entity->filter_count); - // Now, unpack the modified data into a new msgpack_object size_t modified_offset = 0; - if (msgpack_unpack_next(&modified_unpacked, filtered_sbuf.data, filtered_sbuf.size, &modified_offset)) { + if (msgpack_unpack_next(&modified_unpacked, filtered_sbuf.data, + filtered_sbuf.size, &modified_offset)) { map = modified_unpacked.data; } } diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.h b/plugins/out_cloudwatch_logs/cloudwatch_api.h index 280b60bd485..697b15155cd 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.h +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.h @@ -35,10 +35,12 @@ /* number of characters needed to 'end' a PutLogEvents payload */ #define PUT_LOG_EVENTS_FOOTER_LEN 4 -// https://docs.aws.amazon.com/applicationsignals/latest/APIReference/API_Service.html -/* Maxinum number of character limits including both the KeyAttributes key and its value */ +/* + * https://docs.aws.amazon.com/applicationsignals/latest/APIReference/API_Service.html + * Maximum number of character limits including both the KeyAttributes key and its value + */ #define KEY_ATTRIBUTES_MAX_LEN 1100 -/* Maxinum number of character limits including both the Attributes key and its value */ +/* Maximum number of character limits including both the Attributes key and its value */ #define ATTRIBUTES_MAX_LEN 300 /* 256KiB minus 26 bytes for the event */ diff --git a/plugins/out_cloudwatch_logs/cloudwatch_logs.c b/plugins/out_cloudwatch_logs/cloudwatch_logs.c index 02bcfa681e2..4ab884ba9a0 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_logs.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_logs.c @@ -539,7 +539,8 @@ static int cb_cloudwatch_exit(void *data, struct flb_config *config) return 0; } -void entity_destroy(entity *entity) { +void entity_destroy(entity *entity) +{ if(entity->attributes) { flb_free(entity->attributes->cluster_name); flb_free(entity->attributes->instance_id); diff --git a/plugins/out_cloudwatch_logs/cloudwatch_logs.h b/plugins/out_cloudwatch_logs/cloudwatch_logs.h index 1971cffa2b4..9381e1f1376 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_logs.h +++ b/plugins/out_cloudwatch_logs/cloudwatch_logs.h @@ -30,8 +30,10 @@ #include #include -/* Entity object used for associating the telemetry - * in the PutLogEvent call*/ +/* + * Entity object used for associating the telemetry + * in the PutLogEvent call + */ typedef struct entity { struct entity_key_attributes *key_attributes; struct entity_attributes *attributes; @@ -42,8 +44,10 @@ typedef struct entity { int root_filter_count; }entity; -/* KeyAttributes used for CloudWatch Entity object - * in the PutLogEvent call*/ +/* + * KeyAttributes used for CloudWatch Entity object + * in the PutLogEvent call + */ typedef struct entity_key_attributes { char *type; char *name; @@ -51,8 +55,10 @@ typedef struct entity_key_attributes { char *account_id; }entity_key_attributes; -/* Attributes used for CloudWatch Entity object - * in the PutLogEvent call*/ +/* + * Attributes used for CloudWatch Entity object + * in the PutLogEvent call + */ typedef struct entity_attributes { char *platform_type; char *cluster_name; @@ -200,7 +206,8 @@ struct flb_cloudwatch { /* Plugin output instance reference */ struct flb_output_instance *ins; - /* Checks if kubernete filter is enabled + /* + * Checks if kubernete filter is enabled * So the plugin knows when to scrape for Entity */ From 05eaaca037c52944917408f0ed92d7d39c7c52e5 Mon Sep 17 00:00:00 2001 From: Zhihong Lin Date: Thu, 31 Jul 2025 14:15:56 -0400 Subject: [PATCH 3/6] out_cloudwatch: use record accessor for parsing entity values Signed-off-by: Zhihong Lin --- plugins/out_cloudwatch_logs/cloudwatch_api.c | 212 ++++++++----------- 1 file changed, 87 insertions(+), 125 deletions(-) diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.c b/plugins/out_cloudwatch_logs/cloudwatch_api.c index f97bf21e93b..60fa09f1ceb 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.c @@ -41,6 +41,7 @@ #include #include #include +#include "fluent-bit/flb_ra_key.h" #include #include @@ -52,6 +53,7 @@ #include "cloudwatch_api.h" + #define ERR_CODE_ALREADY_EXISTS "ResourceAlreadyExistsException" #define ERR_CODE_NOT_FOUND "ResourceNotFoundException" @@ -1108,141 +1110,101 @@ static char* find_fallback_environment(struct flb_cloudwatch *ctx, entity *entit return NULL; } +/* + * Entity fields can change during stream lifecycle due to service name + * changes. The found_flag ensures filter_count accurately reflects + * which fields need filtering, preventing aws_entity fields from remaining + * in log messages when fallback values are used. + */ +static void set_entity_field(char **field, struct flb_ra_value *val, + int *filter_count, int *found_flag) +{ + if (!val || val->type != FLB_RA_STRING) { + return; + } + + if (found_flag && !*found_flag) { + if (filter_count) { + (*filter_count)++; + } + (*found_flag)++; + } + else if (!found_flag && *field == NULL && filter_count) { + (*filter_count)++; + } + + if (*field) { + flb_free(*field); + } + + if (val->storage == FLB_RA_REF) { + *field = flb_strndup(val->val.ref.buf, val->val.ref.len); + } + else { + *field = flb_strndup(val->val.string, flb_sds_len(val->val.string)); + } +} + void parse_entity(struct flb_cloudwatch *ctx, entity *entity, msgpack_object map, int map_size) { - int i,j; - msgpack_object key, kube_key; - msgpack_object val, kube_val; - - int val_map_size; - for(i=0; i < map_size; i++) { - key = map.via.map.ptr[i].key; - val = map.via.map.ptr[i].val; - if(strncmp(key.via.str.ptr, "kubernetes",10 ) == 0 ) { - if (val.type == MSGPACK_OBJECT_MAP) { - val_map_size = val.via.map.size; - for (j=0; j < val_map_size; j++) { - kube_key = val.via.map.ptr[j].key; - kube_val = val.via.map.ptr[j].val; - if(strncmp(kube_key.via.str.ptr, - "aws_entity_service_name", kube_key.via.str.size) == 0) { - if(!entity->service_name_found) { - entity->filter_count++; - entity->service_name_found++; - } - if(entity->key_attributes->name != NULL) { - flb_free(entity->key_attributes->name); - } - entity->key_attributes->name = flb_strndup(kube_val.via.str.ptr, - kube_val.via.str.size); - } - else if(strncmp(kube_key.via.str.ptr, "aws_entity_environment", - kube_key.via.str.size) == 0) { - if(!entity->environment_found) { - entity->filter_count++; - entity->environment_found++; - } - if(entity->key_attributes->environment != NULL) { - flb_free(entity->key_attributes->environment); - } - entity->key_attributes->environment = flb_strndup(kube_val.via.str.ptr, - kube_val.via.str.size); - } - else if(strncmp(kube_key.via.str.ptr, "namespace_name", - kube_key.via.str.size) == 0) { - if(entity->attributes->namespace != NULL) { - flb_free(entity->attributes->namespace); - } - entity->attributes->namespace = flb_strndup(kube_val.via.str.ptr, - kube_val.via.str.size); - } - else if(strncmp(kube_key.via.str.ptr, "host", - kube_key.via.str.size) == 0) { - if(entity->attributes->node != NULL) { - flb_free(entity->attributes->node); - } - entity->attributes->node = flb_strndup(kube_val.via.str.ptr, - kube_val.via.str.size); - } - else if(strncmp(kube_key.via.str.ptr, "aws_entity_cluster", - kube_key.via.str.size) == 0) { - if(entity->attributes->cluster_name == NULL) { - entity->filter_count++; - } - else { - flb_free(entity->attributes->cluster_name); - } - entity->attributes->cluster_name = flb_strndup(kube_val.via.str.ptr, - kube_val.via.str.size); - } - else if(strncmp(kube_key.via.str.ptr, "aws_entity_workload", - kube_key.via.str.size) == 0) { - if(entity->attributes->workload == NULL) { - entity->filter_count++; - } - else { - flb_free(entity->attributes->workload); - } - entity->attributes->workload = flb_strndup(kube_val.via.str.ptr, - kube_val.via.str.size); - } - else if(strncmp(kube_key.via.str.ptr, "aws_entity_name_source", - kube_key.via.str.size) == 0) { - if(!entity->name_source_found) { - entity->filter_count++; - entity->name_source_found++; - } - if(entity->attributes->name_source != NULL) { - flb_free(entity->attributes->name_source); - } - entity->attributes->name_source = flb_strndup(kube_val.via.str.ptr, - kube_val.via.str.size); - } - else if(strncmp(kube_key.via.str.ptr, "aws_entity_platform", - kube_key.via.str.size) == 0) { - if(entity->attributes->platform_type == NULL) { - entity->filter_count++; - } - else { - flb_free(entity->attributes->platform_type); - } - entity->attributes->platform_type = flb_strndup(kube_val.via.str.ptr, - kube_val.via.str.size); - } - } - } - } - if(strncmp(key.via.str.ptr, "aws_entity_ec2_instance_id", - key.via.str.size ) == 0 ) { - if(entity->attributes->instance_id == NULL) { - entity->root_filter_count++; - } - else { - flb_free(entity->attributes->instance_id); - } - entity->attributes->instance_id = flb_strndup(val.via.str.ptr, - val.via.str.size); + struct flb_record_accessor *ra; + struct flb_ra_value *val; + int i; + + struct { + const char *path; + char **field; + int *filter_count; + int *found_flag; + } field_map[] = { + {"$kubernetes['aws_entity_service_name']", &entity->key_attributes->name, + &entity->filter_count, &entity->service_name_found}, + {"$kubernetes['aws_entity_environment']", &entity->key_attributes->environment, + &entity->filter_count, &entity->environment_found}, + {"$kubernetes['namespace_name']", &entity->attributes->namespace, + NULL, NULL}, + {"$kubernetes['host']", &entity->attributes->node, NULL, NULL}, + {"$kubernetes['aws_entity_cluster']", &entity->attributes->cluster_name, + &entity->filter_count, NULL}, + {"$kubernetes['aws_entity_workload']", &entity->attributes->workload, + &entity->filter_count, NULL}, + {"$kubernetes['aws_entity_name_source']", &entity->attributes->name_source, + &entity->filter_count, &entity->name_source_found}, + {"$kubernetes['aws_entity_platform']", &entity->attributes->platform_type, + &entity->filter_count, NULL}, + {"$aws_entity_ec2_instance_id", &entity->attributes->instance_id, + &entity->root_filter_count, NULL}, + {"$aws_entity_account_id", &entity->key_attributes->account_id, + &entity->root_filter_count, NULL}, + {NULL, NULL, NULL, NULL} + }; + + for (i = 0; field_map[i].path; i++) { + ra = flb_ra_create(field_map[i].path, FLB_FALSE); + if (!ra) { + continue; } - if(strncmp(key.via.str.ptr, "aws_entity_account_id",key.via.str.size ) == 0 ) { - if(entity->key_attributes->account_id == NULL) { - entity->root_filter_count++; - } - else { - flb_free(entity->key_attributes->account_id); - } - entity->key_attributes->account_id = flb_strndup(val.via.str.ptr, - val.via.str.size); + + val = flb_ra_get_value_object(ra, map); + if (val) { + set_entity_field(field_map[i].field, val, field_map[i].filter_count, + field_map[i].found_flag); + flb_ra_key_value_destroy(val); } + + flb_ra_destroy(ra); } - if(entity->key_attributes->name == NULL && - entity->attributes->name_source == NULL && - entity->attributes->workload != NULL) { + + if (entity->key_attributes->name == NULL && + entity->attributes->name_source == NULL && + entity->attributes->workload != NULL) { entity->key_attributes->name = flb_strndup(entity->attributes->workload, strlen(entity->attributes->workload)); entity->attributes->name_source = flb_strndup("K8sWorkload", 11); } - if(entity->key_attributes->environment == NULL) { + + if (entity->key_attributes->environment == NULL) { entity->key_attributes->environment = find_fallback_environment(ctx, entity); } } From 533b33e55ca665decc07c920d7623aaff21d866a Mon Sep 17 00:00:00 2001 From: Zhihong Lin Date: Thu, 31 Jul 2025 14:36:26 -0400 Subject: [PATCH 4/6] out_cloudwatch: address memory leaks on allocation failures Signed-off-by: Zhihong Lin --- plugins/out_cloudwatch_logs/cloudwatch_api.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.c b/plugins/out_cloudwatch_logs/cloudwatch_api.c index 60fa09f1ceb..dbc19ac8b96 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.c @@ -1221,12 +1221,17 @@ void update_or_create_entity(struct flb_cloudwatch *ctx, struct log_stream *stre stream->entity->key_attributes = flb_malloc(sizeof(entity_key_attributes)); if (stream->entity->key_attributes == NULL) { + flb_free(stream->entity); + stream->entity = NULL; return; } memset(stream->entity->key_attributes, 0, sizeof(entity_key_attributes)); stream->entity->attributes = flb_malloc(sizeof(entity_attributes)); if (stream->entity->attributes == NULL) { + flb_free(stream->entity->key_attributes); + flb_free(stream->entity); + stream->entity = NULL; return; } memset(stream->entity->attributes, 0, sizeof(entity_attributes)); From eb5a46a05e9a1407cca2b47c16c393ccf81dcf4b Mon Sep 17 00:00:00 2001 From: Zhihong Lin Date: Thu, 31 Jul 2025 14:39:58 -0400 Subject: [PATCH 5/6] out_cloudwatch: address inconsistent cloudwatch stream reference Signed-off-by: Zhihong Lin --- plugins/out_cloudwatch_logs/cloudwatch_api.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.c b/plugins/out_cloudwatch_logs/cloudwatch_api.c index dbc19ac8b96..f128c1617f1 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.c @@ -324,7 +324,7 @@ static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *bu if(stream->entity->attributes->node != NULL && strlen(stream->entity->attributes->node) != 0) { if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", - "\"K8s.Node\":\"",buf->current_stream->entity->attributes->node,"\"")) { + "\"K8s.Node\":\"",stream->entity->attributes->node,"\"")) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { @@ -334,7 +334,7 @@ static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *bu if(stream->entity->attributes->workload != NULL && strlen(stream->entity->attributes->workload) != 0) { if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", - "\"K8s.Workload\":\"",buf->current_stream->entity->attributes->workload,"\"")) { + "\"K8s.Workload\":\"",stream->entity->attributes->workload,"\"")) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { @@ -344,7 +344,7 @@ static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *bu if(stream->entity->attributes->instance_id != NULL && strlen(stream->entity->attributes->instance_id) != 0) { if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", - "\"EC2.InstanceId\":\"",buf->current_stream->entity->attributes->instance_id,"\"")) { + "\"EC2.InstanceId\":\"",stream->entity->attributes->instance_id,"\"")) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { @@ -354,7 +354,7 @@ static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *bu if(stream->entity->attributes->name_source != NULL && strlen(stream->entity->attributes->name_source) != 0) { if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", - "\"AWS.ServiceNameSource\":\"",buf->current_stream->entity->attributes->name_source,"\"")) { + "\"AWS.ServiceNameSource\":\"",stream->entity->attributes->name_source,"\"")) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { From f2f61a7e8004fc3f895528c3e5436652810ed0bb Mon Sep 17 00:00:00 2001 From: Zhihong Lin Date: Thu, 31 Jul 2025 15:39:50 -0400 Subject: [PATCH 6/6] out_cloudwatch: address insufficient snprintf checks Signed-off-by: Zhihong Lin --- plugins/out_cloudwatch_logs/cloudwatch_api.c | 54 ++++++++++---------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.c b/plugins/out_cloudwatch_logs/cloudwatch_api.c index f128c1617f1..d8a441f6494 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.c @@ -215,8 +215,8 @@ static int entity_add_key_attributes(struct flb_cloudwatch *ctx, struct cw_flush } if(stream->entity->key_attributes->name != NULL && strlen(stream->entity->key_attributes->name) != 0) { - if (!snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s", - "\"Name\":\"",stream->entity->key_attributes->name,"\"")) { + if (snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"Name\":\"",stream->entity->key_attributes->name,"\"") < 0) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { @@ -225,8 +225,8 @@ static int entity_add_key_attributes(struct flb_cloudwatch *ctx, struct cw_flush } if(stream->entity->key_attributes->environment != NULL && strlen(stream->entity->key_attributes->environment) != 0) { - if (!snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s", - "\"Environment\":\"",stream->entity->key_attributes->environment,"\"")) { + if (snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"Environment\":\"",stream->entity->key_attributes->environment,"\"") < 0) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { @@ -235,8 +235,8 @@ static int entity_add_key_attributes(struct flb_cloudwatch *ctx, struct cw_flush } if(stream->entity->key_attributes->account_id != NULL && strlen(stream->entity->key_attributes->account_id) != 0) { - if (!snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s", - "\"AwsAccountId\":\"",stream->entity->key_attributes->account_id,"\"")) { + if (snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"AwsAccountId\":\"",stream->entity->key_attributes->account_id,"\"") < 0) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { @@ -264,8 +264,8 @@ static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *bu if (stream->entity->attributes->platform_type != NULL && strlen(stream->entity->attributes->platform_type) != 0) { if (strcmp(stream->entity->attributes->platform_type, "eks") == 0) { - if (!snprintf(ts,ATTRIBUTES_MAX_LEN, "%s%s%s", - "\"PlatformType\":\"","AWS::EKS","\"")) { + if (snprintf(ts,ATTRIBUTES_MAX_LEN, "%s%s%s", + "\"PlatformType\":\"","AWS::EKS","\"") < 0) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { @@ -273,8 +273,8 @@ static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *bu } if(stream->entity->attributes->cluster_name != NULL && strlen(stream->entity->attributes->cluster_name) != 0) { - if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", - "\"EKS.Cluster\":\"",stream->entity->attributes->cluster_name,"\"")) { + if (snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"EKS.Cluster\":\"",stream->entity->attributes->cluster_name,"\"") < 0) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { @@ -283,8 +283,8 @@ static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *bu } } else if (strcmp(stream->entity->attributes->platform_type, "k8s") == 0) { - if (!snprintf(ts,ATTRIBUTES_MAX_LEN, "%s%s%s", - "\"PlatformType\":\"","K8s","\"")) { + if (snprintf(ts,ATTRIBUTES_MAX_LEN, "%s%s%s", + "\"PlatformType\":\"","K8s","\"") < 0) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { @@ -292,8 +292,8 @@ static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *bu } if(stream->entity->attributes->cluster_name != NULL && strlen(stream->entity->attributes->cluster_name) != 0) { - if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", - "\"K8s.Cluster\":\"",stream->entity->attributes->cluster_name,"\"")) { + if (snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"K8s.Cluster\":\"",stream->entity->attributes->cluster_name,"\"") < 0) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { @@ -303,8 +303,8 @@ static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *bu } } else { - if (!snprintf(ts,ATTRIBUTES_MAX_LEN, "%s%s%s", - "\"PlatformType\":\"","Generic","\"")) { + if (snprintf(ts,ATTRIBUTES_MAX_LEN, "%s%s%s", + "\"PlatformType\":\"","Generic","\"") < 0) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { @@ -313,8 +313,8 @@ static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *bu } if(stream->entity->attributes->namespace != NULL && strlen(stream->entity->attributes->namespace) != 0) { - if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", - "\"K8s.Namespace\":\"",stream->entity->attributes->namespace,"\"")) { + if (snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"K8s.Namespace\":\"",stream->entity->attributes->namespace,"\"") < 0) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { @@ -323,8 +323,8 @@ static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *bu } if(stream->entity->attributes->node != NULL && strlen(stream->entity->attributes->node) != 0) { - if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", - "\"K8s.Node\":\"",stream->entity->attributes->node,"\"")) { + if (snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"K8s.Node\":\"",stream->entity->attributes->node,"\"") < 0) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { @@ -333,8 +333,8 @@ static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *bu } if(stream->entity->attributes->workload != NULL && strlen(stream->entity->attributes->workload) != 0) { - if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", - "\"K8s.Workload\":\"",stream->entity->attributes->workload,"\"")) { + if (snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"K8s.Workload\":\"",stream->entity->attributes->workload,"\"") < 0) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { @@ -343,8 +343,8 @@ static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *bu } if(stream->entity->attributes->instance_id != NULL && strlen(stream->entity->attributes->instance_id) != 0) { - if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", - "\"EC2.InstanceId\":\"",stream->entity->attributes->instance_id,"\"")) { + if (snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"EC2.InstanceId\":\"",stream->entity->attributes->instance_id,"\"") < 0) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { @@ -353,8 +353,8 @@ static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *bu } if(stream->entity->attributes->name_source != NULL && strlen(stream->entity->attributes->name_source) != 0) { - if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", - "\"AWS.ServiceNameSource\":\"",stream->entity->attributes->name_source,"\"")) { + if (snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"AWS.ServiceNameSource\":\"",stream->entity->attributes->name_source,"\"") < 0) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { @@ -457,7 +457,7 @@ static int write_event(struct flb_cloudwatch *ctx, struct cw_flush *buf, { char ts[50]; - if (!snprintf(ts, 50, "%llu", event->timestamp)) { + if (snprintf(ts, 50, "%llu", event->timestamp) < 0) { goto error; }