Skip to content

Commit 9bb75a9

Browse files
committed
out_cloudwatch: use record accessor for parsing entity values
Signed-off-by: Zhihong Lin <zhiholin@amazon.com>
1 parent aeee49c commit 9bb75a9

File tree

1 file changed

+86
-125
lines changed

1 file changed

+86
-125
lines changed

plugins/out_cloudwatch_logs/cloudwatch_api.c

Lines changed: 86 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include <fluent-bit/flb_utils.h>
4242
#include <fluent-bit/flb_intermediate_metric.h>
4343
#include <fluent-bit/flb_metrics.h>
44+
#include "fluent-bit/flb_ra_key.h"
4445

4546
#include <monkey/mk_core.h>
4647
#include <msgpack.h>
@@ -52,6 +53,7 @@
5253

5354
#include "cloudwatch_api.h"
5455

56+
5557
#define ERR_CODE_ALREADY_EXISTS "ResourceAlreadyExistsException"
5658
#define ERR_CODE_NOT_FOUND "ResourceNotFoundException"
5759

@@ -1108,141 +1110,100 @@ static char* find_fallback_environment(struct flb_cloudwatch *ctx, entity *entit
11081110
return NULL;
11091111
}
11101112

1113+
/*
1114+
* Entity fields can change during stream lifecycle due to service name
1115+
* changes. The found_flag ensures filter_count accurately reflects
1116+
* which fields need filtering, preventing aws_entity fields from remaining
1117+
* in log messages when fallback values are used.
1118+
*/
1119+
static void set_entity_field(char **field, struct flb_ra_value *val,
1120+
int *filter_count, int *found_flag)
1121+
{
1122+
if (!val || val->type != FLB_RA_STRING) {
1123+
return;
1124+
}
1125+
1126+
if (found_flag && !*found_flag) {
1127+
if (filter_count) {
1128+
(*filter_count)++;
1129+
}
1130+
(*found_flag)++;
1131+
}
1132+
else if (!found_flag && *field == NULL && filter_count) {
1133+
(*filter_count)++;
1134+
}
1135+
1136+
if (*field) {
1137+
flb_free(*field);
1138+
}
1139+
1140+
if (val->storage == FLB_RA_REF) {
1141+
*field = flb_strndup(val->val.ref.buf, val->val.ref.len);
1142+
}
1143+
else {
1144+
*field = flb_strndup(val->val.string, flb_sds_len(val->val.string));
1145+
}
1146+
}
1147+
11111148
void parse_entity(struct flb_cloudwatch *ctx, entity *entity,
11121149
msgpack_object map, int map_size)
11131150
{
1114-
int i,j;
1115-
msgpack_object key, kube_key;
1116-
msgpack_object val, kube_val;
1117-
1118-
int val_map_size;
1119-
for(i=0; i < map_size; i++) {
1120-
key = map.via.map.ptr[i].key;
1121-
val = map.via.map.ptr[i].val;
1122-
if(strncmp(key.via.str.ptr, "kubernetes",10 ) == 0 ) {
1123-
if (val.type == MSGPACK_OBJECT_MAP) {
1124-
val_map_size = val.via.map.size;
1125-
for (j=0; j < val_map_size; j++) {
1126-
kube_key = val.via.map.ptr[j].key;
1127-
kube_val = val.via.map.ptr[j].val;
1128-
if(strncmp(kube_key.via.str.ptr,
1129-
"aws_entity_service_name", kube_key.via.str.size) == 0) {
1130-
if(!entity->service_name_found) {
1131-
entity->filter_count++;
1132-
entity->service_name_found++;
1133-
}
1134-
if(entity->key_attributes->name != NULL) {
1135-
flb_free(entity->key_attributes->name);
1136-
}
1137-
entity->key_attributes->name = flb_strndup(kube_val.via.str.ptr,
1138-
kube_val.via.str.size);
1139-
}
1140-
else if(strncmp(kube_key.via.str.ptr, "aws_entity_environment",
1141-
kube_key.via.str.size) == 0) {
1142-
if(!entity->environment_found) {
1143-
entity->filter_count++;
1144-
entity->environment_found++;
1145-
}
1146-
if(entity->key_attributes->environment != NULL) {
1147-
flb_free(entity->key_attributes->environment);
1148-
}
1149-
entity->key_attributes->environment = flb_strndup(kube_val.via.str.ptr,
1150-
kube_val.via.str.size);
1151-
}
1152-
else if(strncmp(kube_key.via.str.ptr, "namespace_name",
1153-
kube_key.via.str.size) == 0) {
1154-
if(entity->attributes->namespace != NULL) {
1155-
flb_free(entity->attributes->namespace);
1156-
}
1157-
entity->attributes->namespace = flb_strndup(kube_val.via.str.ptr,
1158-
kube_val.via.str.size);
1159-
}
1160-
else if(strncmp(kube_key.via.str.ptr, "host",
1161-
kube_key.via.str.size) == 0) {
1162-
if(entity->attributes->node != NULL) {
1163-
flb_free(entity->attributes->node);
1164-
}
1165-
entity->attributes->node = flb_strndup(kube_val.via.str.ptr,
1166-
kube_val.via.str.size);
1167-
}
1168-
else if(strncmp(kube_key.via.str.ptr, "aws_entity_cluster",
1169-
kube_key.via.str.size) == 0) {
1170-
if(entity->attributes->cluster_name == NULL) {
1171-
entity->filter_count++;
1172-
}
1173-
else {
1174-
flb_free(entity->attributes->cluster_name);
1175-
}
1176-
entity->attributes->cluster_name = flb_strndup(kube_val.via.str.ptr,
1177-
kube_val.via.str.size);
1178-
}
1179-
else if(strncmp(kube_key.via.str.ptr, "aws_entity_workload",
1180-
kube_key.via.str.size) == 0) {
1181-
if(entity->attributes->workload == NULL) {
1182-
entity->filter_count++;
1183-
}
1184-
else {
1185-
flb_free(entity->attributes->workload);
1186-
}
1187-
entity->attributes->workload = flb_strndup(kube_val.via.str.ptr,
1188-
kube_val.via.str.size);
1189-
}
1190-
else if(strncmp(kube_key.via.str.ptr, "aws_entity_name_source",
1191-
kube_key.via.str.size) == 0) {
1192-
if(!entity->name_source_found) {
1193-
entity->filter_count++;
1194-
entity->name_source_found++;
1195-
}
1196-
if(entity->attributes->name_source != NULL) {
1197-
flb_free(entity->attributes->name_source);
1198-
}
1199-
entity->attributes->name_source = flb_strndup(kube_val.via.str.ptr,
1200-
kube_val.via.str.size);
1201-
}
1202-
else if(strncmp(kube_key.via.str.ptr, "aws_entity_platform",
1203-
kube_key.via.str.size) == 0) {
1204-
if(entity->attributes->platform_type == NULL) {
1205-
entity->filter_count++;
1206-
}
1207-
else {
1208-
flb_free(entity->attributes->platform_type);
1209-
}
1210-
entity->attributes->platform_type = flb_strndup(kube_val.via.str.ptr,
1211-
kube_val.via.str.size);
1212-
}
1213-
}
1214-
}
1215-
}
1216-
if(strncmp(key.via.str.ptr, "aws_entity_ec2_instance_id",
1217-
key.via.str.size ) == 0 ) {
1218-
if(entity->attributes->instance_id == NULL) {
1219-
entity->root_filter_count++;
1220-
}
1221-
else {
1222-
flb_free(entity->attributes->instance_id);
1223-
}
1224-
entity->attributes->instance_id = flb_strndup(val.via.str.ptr,
1225-
val.via.str.size);
1151+
struct flb_record_accessor *ra;
1152+
struct flb_ra_value *val;
1153+
1154+
struct {
1155+
const char *path;
1156+
char **field;
1157+
int *filter_count;
1158+
int *found_flag;
1159+
} field_map[] = {
1160+
{"$kubernetes['aws_entity_service_name']", &entity->key_attributes->name,
1161+
&entity->filter_count, &entity->service_name_found},
1162+
{"$kubernetes['aws_entity_environment']", &entity->key_attributes->environment,
1163+
&entity->filter_count, &entity->environment_found},
1164+
{"$kubernetes['namespace_name']", &entity->attributes->namespace,
1165+
NULL, NULL},
1166+
{"$kubernetes['host']", &entity->attributes->node, NULL, NULL},
1167+
{"$kubernetes['aws_entity_cluster']", &entity->attributes->cluster_name,
1168+
&entity->filter_count, NULL},
1169+
{"$kubernetes['aws_entity_workload']", &entity->attributes->workload,
1170+
&entity->filter_count, NULL},
1171+
{"$kubernetes['aws_entity_name_source']", &entity->attributes->name_source,
1172+
&entity->filter_count, &entity->name_source_found},
1173+
{"$kubernetes['aws_entity_platform']", &entity->attributes->platform_type,
1174+
&entity->filter_count, NULL},
1175+
{"$aws_entity_ec2_instance_id", &entity->attributes->instance_id,
1176+
&entity->root_filter_count, NULL},
1177+
{"$aws_entity_account_id", &entity->key_attributes->account_id,
1178+
&entity->root_filter_count, NULL},
1179+
{NULL, NULL, NULL, NULL}
1180+
};
1181+
1182+
for (int i = 0; field_map[i].path; i++) {
1183+
ra = flb_ra_create(field_map[i].path, FLB_FALSE);
1184+
if (!ra) {
1185+
continue;
12261186
}
1227-
if(strncmp(key.via.str.ptr, "aws_entity_account_id",key.via.str.size ) == 0 ) {
1228-
if(entity->key_attributes->account_id == NULL) {
1229-
entity->root_filter_count++;
1230-
}
1231-
else {
1232-
flb_free(entity->key_attributes->account_id);
1233-
}
1234-
entity->key_attributes->account_id = flb_strndup(val.via.str.ptr,
1235-
val.via.str.size);
1187+
1188+
val = flb_ra_get_value_object(ra, map);
1189+
if (val) {
1190+
set_entity_field(field_map[i].field, val, field_map[i].filter_count,
1191+
field_map[i].found_flag);
1192+
flb_ra_key_value_destroy(val);
12361193
}
1194+
1195+
flb_ra_destroy(ra);
12371196
}
1238-
if(entity->key_attributes->name == NULL &&
1239-
entity->attributes->name_source == NULL &&
1240-
entity->attributes->workload != NULL) {
1197+
1198+
if (entity->key_attributes->name == NULL &&
1199+
entity->attributes->name_source == NULL &&
1200+
entity->attributes->workload != NULL) {
12411201
entity->key_attributes->name = flb_strndup(entity->attributes->workload,
12421202
strlen(entity->attributes->workload));
12431203
entity->attributes->name_source = flb_strndup("K8sWorkload", 11);
12441204
}
1245-
if(entity->key_attributes->environment == NULL) {
1205+
1206+
if (entity->key_attributes->environment == NULL) {
12461207
entity->key_attributes->environment = find_fallback_environment(ctx, entity);
12471208
}
12481209
}

0 commit comments

Comments
 (0)