@@ -119,9 +119,9 @@ func (c *Controllor) initRecvs(env string) []recvs.AcceptorRecvItf {
119
119
Topics : []string {utils .Settings .GetString ("settings.acceptor.recvs.plugins." + name + ".topics." + env )},
120
120
Group : utils .Settings .GetString ("settings.acceptor.recvs.plugins." + name + ".groups." + env ),
121
121
Tag : utils .Settings .GetString ("settings.acceptor.recvs.plugins." + name + ".tags." + env ),
122
- IsJsonFormat : utils .Settings .GetBool ("settings.acceptor.recvs.plugins." + name + ".is_json_format" ),
122
+ IsJSONFormat : utils .Settings .GetBool ("settings.acceptor.recvs.plugins." + name + ".is_json_format" ),
123
123
TagKey : utils .Settings .GetString ("settings.acceptor.recvs.plugins." + name + ".tag_key" ),
124
- JsonTagKey : utils .Settings .GetString ("settings.acceptor.recvs.plugins." + name + ".json_tag_key" ),
124
+ JSONTagKey : utils .Settings .GetString ("settings.acceptor.recvs.plugins." + name + ".json_tag_key" ),
125
125
RewriteTag : recvs .GetKafkaRewriteTag (utils .Settings .GetString ("settings.acceptor.recvs.plugins." + name + ".rewrite_tag" ), env ),
126
126
NConsumer : utils .Settings .GetInt ("settings.acceptor.recvs.plugins." + name + ".nconsumer" ),
127
127
KafkaCommitCfg : & recvs.KafkaCommitCfg {
@@ -311,14 +311,25 @@ func (c *Controllor) initPostPipeline(env string, waitCommitChan chan<- int64) *
311
311
switch utils .Settings .GetString ("settings.post_filters.plugins." + name + ".type" ) {
312
312
case "es-dispatcher" :
313
313
fs = append (fs , postFilters .NewESDispatcherFilter (& postFilters.ESDispatcherFilterCfg {
314
- Tags : libs .LoadTagsAppendEnv (env , utils .Settings .GetStringSlice ("settings.post_filters.plugins.es_dispatcher .tags" )),
315
- TagKey : utils .Settings .GetString ("settings.post_filters.plugins.es_dispatcher .tag_key" ),
316
- ReTagMap : postFilters .LoadReTagMap (env , utils .Settings .Get ("settings.post_filters.plugins.es_dispatcher .rewrite_tag_map" )),
314
+ Tags : libs .LoadTagsAppendEnv (env , utils .Settings .GetStringSlice ("settings.post_filters.plugins." + name + " .tags" )),
315
+ TagKey : utils .Settings .GetString ("settings.post_filters.plugins." + name + " .tag_key" ),
316
+ ReTagMap : postFilters .LoadReTagMap (env , utils .Settings .Get ("settings.post_filters.plugins." + name + " .rewrite_tag_map" )),
317
317
}))
318
318
case "tag-rewriter" :
319
319
fs = append (fs , postFilters .NewForwardTagRewriterFilter (& postFilters.ForwardTagRewriterFilterCfg { // wechat mini program
320
- Tag : utils .Settings .GetString ("settings.post_filters.plugins.forward_tag_rewriter.tag" ) + "." + env ,
321
- TagKey : utils .Settings .GetString ("settings.post_filters.plugins.forward_tag_rewriter.tag_key" ),
320
+ Tag : utils .Settings .GetString ("settings.post_filters.plugins." + name + ".tag" ) + "." + env ,
321
+ TagKey : utils .Settings .GetString ("settings.post_filters.plugins." + name + ".tag_key" ),
322
+ }))
323
+ case "fields" :
324
+ fs = append (fs , postFilters .NewFieldsFilter (& postFilters.FieldsFilterCfg {
325
+ Tags : libs .LoadTagsAppendEnv (env , utils .Settings .GetStringSlice ("settings.post_filters.plugins." + name + ".tags" )),
326
+ IncludeFields : utils .Settings .GetStringSlice ("settings.post_filters.plugins." + name + ".include_fields" ),
327
+ ExcludeFields : utils .Settings .GetStringSlice ("settings.post_filters.plugins." + name + ".exclude_fields" ),
328
+ NewFieldTemplates : utils .Settings .GetStringMapString ("settings.post_filters.plugins." + name + ".new_fields" ),
329
+ }))
330
+ case "custom-bigdata" :
331
+ fs = append (fs , postFilters .NewCustomBigDataFilter (& postFilters.CustomBigDataFilterCfg {
332
+ Tags : libs .LoadTagsAppendEnv (env , utils .Settings .GetStringSlice ("settings.post_filters.plugins." + name + ".tags" )),
322
333
}))
323
334
default :
324
335
utils .Logger .Panic ("unknown post_filter type" ,
@@ -336,6 +347,13 @@ func (c *Controllor) initPostPipeline(env string, waitCommitChan chan<- int64) *
336
347
utils .Logger .Panic ("post_filter configuration error" )
337
348
}
338
349
350
+ fs = append (fs ,
351
+ postFilters .NewDefaultFilter (& postFilters.DefaultFilterCfg {
352
+ MsgKey : utils .Settings .GetString ("settings.post_filters.plugins.default.msg_key" ),
353
+ MaxLen : utils .Settings .GetInt ("settings.post_filters.plugins.default.max_len" ),
354
+ }),
355
+ )
356
+
339
357
return postFilters .NewPostPipeline (& postFilters.PostPipelineCfg {
340
358
MsgPool : c .msgPool ,
341
359
CommittedChan : waitCommitChan ,
0 commit comments