Skip to content

Commit 9329d26

Browse files
committed
ci: release v1.8.9
1 parent f9a82e5 commit 9329d26

File tree

12 files changed

+220
-67
lines changed

12 files changed

+220
-67
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,3 +114,5 @@ cpu.pprof
114114

115115
# temporary
116116
# docs
117+
118+
coverage.txt

.travis.yml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
language: go
2+
3+
sudo: false
4+
5+
go:
6+
- 1.11.x
7+
- 1.12.x
8+
9+
git:
10+
depth: 1
11+
12+
cache:
13+
directories:
14+
- vendor
15+
- $GOPATH/src
16+
17+
18+
script:
19+
- git checkout $TRAVIS_COMMIT . # travis.ci will overwrite to master branch
20+
- go test -race -coverprofile=coverage.txt -covermode=atomic ./...
21+
# - go test -bench ./...
22+
23+
after_script:
24+
- bash <(curl -s https://codecov.io/bash)

CHANGELOG.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,21 @@
22
*CURRENT*
33
---
44

5+
- 2019-05-28 (Laisky) fix: recvs check active env
6+
- 2019-05-28 (Laisky) feat: add null sender
7+
- 2019-05-27 (Laisky) ci: upgrade go-utils
8+
- 2019-05-27 (Laisky) docs: update readme
59

10+
*v1.8.8*
11+
---
12+
13+
- 2019-05-24 (Laisky) fix(paas-357): journal memory leak
14+
15+
*v1.8.7*
16+
---
17+
18+
- 2019-05-23 (Laisky) ci: update gomod
19+
- 2019-05-23 (Laisky) docs: update readme
620

721
*v1.8.6*
822
---

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22

33
Rewrite fluentd-server by Golang, Higher performance with less resource requirement.
44

5-
[![pipeline status](http://gitlab.pateo.com.cn:10080/PaaS/go-fluentd/badges/master/pipeline.svg)](http://gitlab.pateo.com.cn:10080/PaaS/go-fluentd/commits/master)
6-
5+
![GitHub release](https://img.shields.io/github/release/Laisky/go-fluentd.svg)
6+
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
7+
[![Build Status](https://travis-ci.org/Laisky/go-fluentd.svg?branch=master)](https://travis-ci.org/Laisky/go-fluentd)
8+
[![codecov](https://codecov.io/gh/Laisky/go-fluentd/branch/master/graph/badge.svg)](https://codecov.io/gh/Laisky/go-fluentd)
79
[![Commitizen friendly](https://img.shields.io/badge/commitizen-friendly-brightgreen.svg)](http://commitizen.github.io/cz-cli/)
810
[![Go Report Card](https://goreportcard.com/badge/github.com/Laisky/go-fluentd)](https://goreportcard.com/report/github.com/Laisky/go-fluentd)
911
[![GoDoc](https://godoc.org/github.com/Laisky/go-fluentd?status.svg)](https://godoc.org/github.com/Laisky/go-fluentd)

acceptor.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@ import (
1010
"github.com/Laisky/zap"
1111
)
1212

13+
// AcceptorCfg is the configuation of Acceptor
1314
type AcceptorCfg struct {
1415
MsgPool *sync.Pool
1516
Journal *Journal
1617
AsyncOutChanSize, SyncOutChanSize int
17-
MaxRotateId int64
18+
MaxRotateID int64
1819
}
1920

2021
// Acceptor listening tcp connection, and decode messages
@@ -40,12 +41,12 @@ func NewAcceptor(cfg *AcceptorCfg, recvs ...recvs.AcceptorRecvItf) *Acceptor {
4041
func (a *Acceptor) Run() {
4142
// got exists max id from legacy
4243
utils.Logger.Info("process legacy data...")
43-
maxId, err := a.Journal.LoadMaxId()
44+
maxID, err := a.Journal.LoadMaxID()
4445
if err != nil {
4546
utils.Logger.Panic("try to process legacy messages got error", zap.Error(err))
4647
}
4748

48-
couter, err := utils.NewRotateCounterFromN(maxId+1, a.MaxRotateId)
49+
couter, err := utils.NewRotateCounterFromN(maxID+1, a.MaxRotateID)
4950
if err != nil {
5051
panic(fmt.Errorf("try to create counter got error: %+v", err))
5152
}

controllor.go

Lines changed: 63 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -63,16 +63,19 @@ func (c *Controllor) initRecvs(env string) []recvs.AcceptorRecvItf {
6363
switch utils.Settings.Get("settings.acceptor.recvs.plugins").(type) {
6464
case map[string]interface{}:
6565
for name := range utils.Settings.Get("settings.acceptor.recvs.plugins").(map[string]interface{}) {
66+
if !StringListContains(utils.Settings.GetStringSlice("settings.acceptor.recvs.plugins."+name+".active_env"), env) {
67+
utils.Logger.Info("recv not support current env", zap.String("name", name), zap.String("env", env))
68+
continue
69+
}
70+
6671
switch utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".type") {
6772
case "fluentd":
68-
if StringListContains(utils.Settings.GetStringSlice("settings.acceptor.recvs.plugins."+name+".active_env"), env) {
69-
receivers = append(receivers, recvs.NewFluentdRecv(&recvs.FluentdRecvCfg{
70-
Name: name,
71-
Addr: utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".addr"),
72-
TagKey: utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".tag_key"),
73-
IsRewriteTagFromTagKey: utils.Settings.GetBool("settings.acceptor.recvs.plugins." + name + ".is_rewrite_tag_from_tag_key"),
74-
}))
75-
}
73+
receivers = append(receivers, recvs.NewFluentdRecv(&recvs.FluentdRecvCfg{
74+
Name: name,
75+
Addr: utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".addr"),
76+
TagKey: utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".tag_key"),
77+
IsRewriteTagFromTagKey: utils.Settings.GetBool("settings.acceptor.recvs.plugins." + name + ".is_rewrite_tag_from_tag_key"),
78+
}))
7679
case "rsyslog":
7780
receivers = append(receivers, recvs.NewRsyslogRecv(&recvs.RsyslogCfg{
7881
Name: name,
@@ -147,7 +150,7 @@ func (c *Controllor) initAcceptor(journal *Journal, receivers []recvs.AcceptorRe
147150
acceptor := NewAcceptor(&AcceptorCfg{
148151
MsgPool: c.msgPool,
149152
Journal: journal,
150-
MaxRotateId: utils.Settings.GetInt64("settings.acceptor.max_rotate_id"),
153+
MaxRotateID: utils.Settings.GetInt64("settings.acceptor.max_rotate_id"),
151154
AsyncOutChanSize: utils.Settings.GetInt("settings.acceptor.async_out_chan_size"),
152155
SyncOutChanSize: utils.Settings.GetInt("settings.acceptor.sync_out_chan_size"),
153156
},
@@ -354,57 +357,64 @@ func StringListContains(ls []string, v string) bool {
354357

355358
func (c *Controllor) initSenders(env string) []senders.SenderItf {
356359
ss := []senders.SenderItf{}
357-
358360
switch utils.Settings.Get("settings.producer.plugins").(type) {
359361
case map[string]interface{}:
360362
for name := range utils.Settings.Get("settings.producer.plugins").(map[string]interface{}) {
363+
if !StringListContains(utils.Settings.GetStringSlice("settings.producer.plugins."+name+".active_env"), env) {
364+
utils.Logger.Info("sender not support current env", zap.String("name", name), zap.String("env", env))
365+
continue
366+
}
367+
361368
switch utils.Settings.GetString("settings.producer.plugins." + name + ".type") {
362369
case "fluentd":
363-
if StringListContains(utils.Settings.GetStringSlice("settings.producer.plugins."+name+".active_env"), env) {
364-
ss = append(ss, senders.NewFluentSender(&senders.FluentSenderCfg{
365-
Name: name,
366-
Addr: utils.Settings.GetString("settings.producer.plugins." + name + ".addr"),
367-
BatchSize: utils.Settings.GetInt("settings.producer.plugins." + name + ".msg_batch_size"),
368-
MaxWait: utils.Settings.GetDuration("settings.producer.plugins."+name+".max_wait_sec") * time.Second,
369-
RetryChanSize: utils.Settings.GetInt("settings.producer.plugins." + name + ".retry_chan_len"),
370-
InChanSize: utils.Settings.GetInt("settings.producer.sender_inchan_size"),
371-
NFork: utils.Settings.GetInt("settings.producer.plugins." + name + ".forks"),
372-
Tags: utils.Settings.GetStringSlice("settings.producer.plugins." + name + ".tags"), // do not append env
373-
IsDiscardWhenBlocked: utils.Settings.GetBool("settings.producer.plugins." + name + ".is_discard_when_blocked"),
374-
}))
375-
}
370+
ss = append(ss, senders.NewFluentSender(&senders.FluentSenderCfg{
371+
Name: name,
372+
Addr: utils.Settings.GetString("settings.producer.plugins." + name + ".addr"),
373+
BatchSize: utils.Settings.GetInt("settings.producer.plugins." + name + ".msg_batch_size"),
374+
MaxWait: utils.Settings.GetDuration("settings.producer.plugins."+name+".max_wait_sec") * time.Second,
375+
RetryChanSize: utils.Settings.GetInt("settings.producer.plugins." + name + ".retry_chan_len"),
376+
InChanSize: utils.Settings.GetInt("settings.producer.sender_inchan_size"),
377+
NFork: utils.Settings.GetInt("settings.producer.plugins." + name + ".forks"),
378+
Tags: utils.Settings.GetStringSlice("settings.producer.plugins." + name + ".tags"), // do not append env
379+
IsDiscardWhenBlocked: utils.Settings.GetBool("settings.producer.plugins." + name + ".is_discard_when_blocked"),
380+
}))
376381
case "kafka":
377-
if StringListContains(utils.Settings.GetStringSlice("settings.producer.plugins."+name+".active_env"), env) {
378-
ss = append(ss, senders.NewKafkaSender(&senders.KafkaSenderCfg{
379-
Name: name,
380-
Brokers: utils.Settings.GetStringSlice("settings.producer.plugins." + name + ".brokers." + env),
381-
Topic: utils.Settings.GetString("settings.producer.plugins." + name + ".topic"),
382-
TagKey: utils.Settings.GetString("settings.producer.plugins." + name + ".tag_key"),
383-
BatchSize: utils.Settings.GetInt("settings.producer.plugins." + name + ".msg_batch_size"),
384-
MaxWait: utils.Settings.GetDuration("settings.producer.plugins."+name+".max_wait_sec") * time.Second,
385-
RetryChanSize: utils.Settings.GetInt("settings.producer.plugins." + name + ".retry_chan_len"),
386-
InChanSize: utils.Settings.GetInt("settings.producer.sender_inchan_size"),
387-
NFork: utils.Settings.GetInt("settings.producer.plugins." + name + ".forks"),
388-
Tags: libs.LoadTagsAppendEnv(env, utils.Settings.GetStringSlice("settings.producer.plugins."+name+".tags")),
389-
IsDiscardWhenBlocked: utils.Settings.GetBool("settings.producer.plugins." + name + ".is_discard_when_blocked"),
390-
}))
391-
}
382+
ss = append(ss, senders.NewKafkaSender(&senders.KafkaSenderCfg{
383+
Name: name,
384+
Brokers: utils.Settings.GetStringSlice("settings.producer.plugins." + name + ".brokers." + env),
385+
Topic: utils.Settings.GetString("settings.producer.plugins." + name + ".topic"),
386+
TagKey: utils.Settings.GetString("settings.producer.plugins." + name + ".tag_key"),
387+
BatchSize: utils.Settings.GetInt("settings.producer.plugins." + name + ".msg_batch_size"),
388+
MaxWait: utils.Settings.GetDuration("settings.producer.plugins."+name+".max_wait_sec") * time.Second,
389+
RetryChanSize: utils.Settings.GetInt("settings.producer.plugins." + name + ".retry_chan_len"),
390+
InChanSize: utils.Settings.GetInt("settings.producer.sender_inchan_size"),
391+
NFork: utils.Settings.GetInt("settings.producer.plugins." + name + ".forks"),
392+
Tags: libs.LoadTagsAppendEnv(env, utils.Settings.GetStringSlice("settings.producer.plugins."+name+".tags")),
393+
IsDiscardWhenBlocked: utils.Settings.GetBool("settings.producer.plugins." + name + ".is_discard_when_blocked"),
394+
}))
392395
case "es":
393-
if StringListContains(utils.Settings.GetStringSlice("settings.producer.plugins."+name+".active_env"), env) {
394-
ss = append(ss, senders.NewElasticSearchSender(&senders.ElasticSearchSenderCfg{
395-
Name: name,
396-
BatchSize: utils.Settings.GetInt("settings.producer.plugins." + name + ".msg_batch_size"),
397-
Addr: utils.Settings.GetString("settings.producer.plugins." + name + ".addr"),
398-
MaxWait: utils.Settings.GetDuration("settings.producer.plugins."+name+".max_wait_sec") * time.Second,
399-
RetryChanSize: utils.Settings.GetInt("settings.producer.plugins." + name + ".retry_chan_len"),
400-
InChanSize: utils.Settings.GetInt("settings.producer.sender_inchan_size"),
401-
NFork: utils.Settings.GetInt("settings.producer.plugins." + name + ".forks"),
402-
TagKey: utils.Settings.GetString("settings.producer.plugins." + name + ".tag_key"),
403-
Tags: libs.LoadTagsAppendEnv(env, utils.Settings.GetStringSlice("settings.producer.plugins."+name+".tags")),
404-
TagIndexMap: senders.LoadESTagIndexMap(env, utils.Settings.Get("settings.producer.plugins."+name+".indices")),
405-
IsDiscardWhenBlocked: utils.Settings.GetBool("settings.producer.plugins." + name + ".is_discard_when_blocked"),
406-
}))
407-
}
396+
ss = append(ss, senders.NewElasticSearchSender(&senders.ElasticSearchSenderCfg{
397+
Name: name,
398+
BatchSize: utils.Settings.GetInt("settings.producer.plugins." + name + ".msg_batch_size"),
399+
Addr: utils.Settings.GetString("settings.producer.plugins." + name + ".addr"),
400+
MaxWait: utils.Settings.GetDuration("settings.producer.plugins."+name+".max_wait_sec") * time.Second,
401+
RetryChanSize: utils.Settings.GetInt("settings.producer.plugins." + name + ".retry_chan_len"),
402+
InChanSize: utils.Settings.GetInt("settings.producer.sender_inchan_size"),
403+
NFork: utils.Settings.GetInt("settings.producer.plugins." + name + ".forks"),
404+
TagKey: utils.Settings.GetString("settings.producer.plugins." + name + ".tag_key"),
405+
Tags: libs.LoadTagsAppendEnv(env, utils.Settings.GetStringSlice("settings.producer.plugins."+name+".tags")),
406+
TagIndexMap: senders.LoadESTagIndexMap(env, utils.Settings.Get("settings.producer.plugins."+name+".indices")),
407+
IsDiscardWhenBlocked: utils.Settings.GetBool("settings.producer.plugins." + name + ".is_discard_when_blocked"),
408+
}))
409+
case "null":
410+
ss = append(ss, senders.NewNullSender(&senders.NullSenderCfg{
411+
Name: name,
412+
Tags: libs.LoadTagsAppendEnv(env, utils.Settings.GetStringSlice("settings.producer.plugins."+name+".tags")),
413+
LogLevel: utils.Settings.GetString("settings.producer.plugins." + name + ".log_level"),
414+
InChanSize: utils.Settings.GetInt("settings.producer.plugins." + name + ".sender_inchan_size"),
415+
IsCommit: utils.Settings.GetBool("settings.producer.plugins." + name + ".is_commit"),
416+
IsDiscardWhenBlocked: utils.Settings.GetBool("settings.producer.plugins." + name + ".is_discard_when_blocked"),
417+
}))
408418
default:
409419
utils.Logger.Panic("unknown sender type",
410420
zap.String("sender_type", utils.Settings.GetString("settings.producer.plugins."+name+".type")),

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ go 1.12
44

55
require (
66
github.com/Laisky/go-syslog v2.3.3+incompatible
7-
github.com/Laisky/go-utils v1.3.2
7+
github.com/Laisky/go-utils v1.3.4
88
github.com/Laisky/zap v1.9.2
99
github.com/Shopify/sarama v1.22.0
1010
github.com/cespare/xxhash v1.1.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ github.com/Laisky/go-chaining v0.0.0-20180507092046-43dcdc5a21be h1:7Rxhm6IjOtDA
1010
github.com/Laisky/go-chaining v0.0.0-20180507092046-43dcdc5a21be/go.mod h1:1mdzaETo0kjvCQICPSePsoaatJN4l7JvEA1200lyevo=
1111
github.com/Laisky/go-syslog v2.3.3+incompatible h1:TSHhP3iadAPDzC5efyYLPnGkv2pvUtuUInm7poVRkFA=
1212
github.com/Laisky/go-syslog v2.3.3+incompatible/go.mod h1:PPmESkLU3DEbJ3fRXam2hqJTNQVFMggsDXBnOtu2ITk=
13-
github.com/Laisky/go-utils v1.3.2 h1:EU1GKb+idl8CfU8rB+6UENzCWpgf6uLqMKaekTJIbFg=
14-
github.com/Laisky/go-utils v1.3.2/go.mod h1:PEkz+iVEmB3wkKeGaUWdJWMIxQFiDNndcPIdyXASTiY=
13+
github.com/Laisky/go-utils v1.3.4 h1:tLseCy/zUlQ/ZGhFxCAVcS+98K73sZ9qZRN8nVpEC7E=
14+
github.com/Laisky/go-utils v1.3.4/go.mod h1:PEkz+iVEmB3wkKeGaUWdJWMIxQFiDNndcPIdyXASTiY=
1515
github.com/Laisky/zap v1.9.2 h1:7dTtABboHk8DnT0d6Dc8A9Opu2cyIEaMlL9JO11zvag=
1616
github.com/Laisky/zap v1.9.2/go.mod h1:CQdLb2wEfqBvoNLmfOp7wnKTOMvhc4DQRc3xfshL4EQ=
1717
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=

journal.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
"github.com/Laisky/go-fluentd/libs"
11+
"github.com/Laisky/go-fluentd/monitor"
1112
utils "github.com/Laisky/go-utils"
1213
"github.com/Laisky/go-utils/journal"
1314
"github.com/Laisky/zap"
@@ -38,7 +39,7 @@ func NewJournal(cfg *JournalCfg) *Journal {
3839
utils.Logger.Warn("journal buf file size too small", zap.Int64("size", cfg.BufSizeBytes))
3940
}
4041

41-
return &Journal{
42+
j := &Journal{
4243
JournalCfg: cfg,
4344
j: journal.NewJournal(&journal.JournalConfig{
4445
BufDirPath: cfg.BufDirPath,
@@ -47,10 +48,12 @@ func NewJournal(cfg *JournalCfg) *Journal {
4748
legacyLock: 0,
4849
outChan: make(chan *libs.FluentMsg, cfg.JournalOutChanLen),
4950
}
51+
j.registerMonitor()
52+
return j
5053
}
5154

52-
// LoadMaxId load the max committed id from journal
53-
func (j *Journal) LoadMaxId() (int64, error) {
55+
// LoadMaxID load the max committed id from journal
56+
func (j *Journal) LoadMaxID() (int64, error) {
5457
return j.j.LoadMaxId()
5558
}
5659

@@ -217,3 +220,9 @@ func (j *Journal) GetCommitChan() chan<- int64 {
217220

218221
return commitChan
219222
}
223+
224+
func (j *Journal) registerMonitor() {
225+
monitor.AddMetric("journal", func() map[string]interface{} {
226+
return j.j.GetMetric()
227+
})
228+
}

senders/elasticsearch.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,12 @@ type ESIndexResp struct {
155155
Status int `json:"status"`
156156
}
157157

158+
func isStatusCodeOk(s int) bool {
159+
return s/100 == 2
160+
}
161+
158162
func (s *ElasticSearchSender) checkResp(resp *http.Response) (err error) {
159-
if utils.FloorDivision(resp.StatusCode, 100) != 2 {
163+
if !isStatusCodeOk(resp.StatusCode) {
160164
return fmt.Errorf("server return error code %v", resp.StatusCode)
161165
}
162166

@@ -175,7 +179,7 @@ func (s *ElasticSearchSender) checkResp(resp *http.Response) (err error) {
175179
}
176180

177181
for _, v := range ret.Items {
178-
if utils.FloorDivision(v.Index.Status, 100) != 2 {
182+
if !isStatusCodeOk(resp.StatusCode) {
179183
// do not retry if there is part of msgs got error
180184
utils.Logger.Warn("bulk got error for idx",
181185
zap.ByteString("body", bb),

senders/fluentd.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func (s *FluentSender) GetName() string {
4949
}
5050

5151
func (s *FluentSender) Spawn(tag string) chan<- *libs.FluentMsg {
52-
utils.Logger.Info("SpawnForTag", zap.String("tag", tag))
52+
utils.Logger.Info("spawn for tag", zap.String("tag", tag))
5353
inChan := make(chan *libs.FluentMsg, s.InChanSize) // for each tag
5454
go s.runFlusher(inChan)
5555

0 commit comments

Comments
 (0)