Skip to content

Commit 691ac53

Browse files
committed
Merge branch 'develop'
2 parents a338d86 + 5d7ca52 commit 691ac53

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1031
-655
lines changed

.docker/Dockerfile

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# docker build --build-arg http_proxy=http://172.16.4.26:17777 --build-arg https_proxy=http://172.16.4.26:17777
2-
FROM ppcelery/gobase:1.14.0-alpine3.11 AS gobuild
2+
FROM golang:1.14.3-buster AS gobuild
3+
4+
# install dependencies
5+
RUN apt-get update \
6+
&& apt-get install -y --no-install-recommends g++ make gcc git build-essential ca-certificates curl \
7+
&& update-ca-certificates
38

49
ENV GO111MODULE=on
510
WORKDIR /go-fluentd
@@ -14,8 +19,9 @@ RUN go build -a -ldflags '-w -extldflags "-static"' entrypoints/main.go
1419

1520

1621
# copy executable file and certs to a pure container
17-
FROM alpine:3.11
22+
FROM debian:buster
1823
COPY --from=gobuild /etc/ssl/certs /etc/ssl/certs
1924
COPY --from=gobuild /go-fluentd/main go-fluentd
2025

21-
CMD ["./go-fluentd", "--config=/etc/go-fluentd/settings"]
26+
ENTRYPOINT [ "./go-fluentd" ]
27+
CMD ["--config=/etc/go-fluentd/settings.yml"]

.docker/README.md

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,6 @@ there are two kinds of runner container:
1313
build on machine that should installed docker & golang.
1414

1515
```sh
16-
go mod download
17-
go mod vendor
18-
19-
# build base image
20-
docker build . -f ./.docker/gobase.Dockerfile -t ppcelery/gobase:1.14.0-alpine3.11
21-
docker push ppcelery/gobase:1.14.0-alpine3.11
22-
2316
# build image
2417
docker build . -f ./.docker/Dockerfile -t ppcelery/go-fluentd:1.12.7
2518
docker push ppcelery/go-fluentd:1.12.7
@@ -29,7 +22,7 @@ docker run -it --rm \
2922
-v /opt/configs/go-fluentd:/etc/go-fluentd \
3023
-v /data/log/fluentd/go-concator:/data/log/fluentd/go-concator \
3124
ppcelery/go-fluentd:1.12.7 \
32-
./go-fluentd --config=/etc/go-fluentd --env=prod --addr=0.0.0.0:22800 --log-level=error
25+
./go-fluentd --config=/etc/go-fluentd/ --env=prod --addr=0.0.0.0:22800 --log-level=error
3326
```
3427

3528
## go-fluentd-foward

.docker/gobase.Dockerfile

Lines changed: 0 additions & 8 deletions
This file was deleted.

acceptor.go

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,30 +28,53 @@ type Acceptor struct {
2828

2929
// NewAcceptor create new Acceptor
3030
func NewAcceptor(cfg *AcceptorCfg, recvs ...recvs.AcceptorRecvItf) *Acceptor {
31-
utils.Logger.Info("create Acceptor")
32-
33-
if cfg.MaxRotateID < 100 {
34-
utils.Logger.Error("MaxRotateID should not too small", zap.Int64("rotate", cfg.MaxRotateID))
35-
} else if cfg.MaxRotateID < 1000000 {
36-
utils.Logger.Warn("MaxRotateID should not too small", zap.Int64("rotate", cfg.MaxRotateID))
37-
}
38-
39-
return &Acceptor{
31+
a := &Acceptor{
4032
AcceptorCfg: cfg,
4133
syncOutChan: make(chan *libs.FluentMsg, cfg.SyncOutChanSize),
4234
asyncOutChan: make(chan *libs.FluentMsg, cfg.AsyncOutChanSize),
4335
recvs: recvs,
4436
}
37+
if err := a.valid(); err != nil {
38+
libs.Logger.Panic("new acceptor", zap.Error(err))
39+
}
40+
41+
libs.Logger.Info("create acceptor",
42+
zap.Int64("max_rotate_id", a.MaxRotateID),
43+
zap.Int("sync_out_chan_size", a.SyncOutChanSize),
44+
zap.Int("async_out_chan_size", a.AsyncOutChanSize),
45+
)
46+
return a
47+
}
48+
49+
func (a *Acceptor) valid() error {
50+
if a.MaxRotateID == 0 {
51+
a.MaxRotateID = 372036854775807
52+
libs.Logger.Info("reset max_rotate_id", zap.Int64("max_rotate_id", a.MaxRotateID))
53+
} else if a.MaxRotateID < 1000000 {
54+
libs.Logger.Warn("max_rotate_id should not too small", zap.Int64("max_rotate_id", a.MaxRotateID))
55+
}
56+
57+
if a.SyncOutChanSize == 0 {
58+
a.SyncOutChanSize = 10000
59+
libs.Logger.Info("reset sync_out_chan_size", zap.Int("sync_out_chan_size", a.SyncOutChanSize))
60+
}
61+
62+
if a.AsyncOutChanSize == 0 {
63+
a.AsyncOutChanSize = 10000
64+
libs.Logger.Info("reset async_out_chan_size", zap.Int("async_out_chan_size", a.AsyncOutChanSize))
65+
}
66+
67+
return nil
4568
}
4669

4770
// Run starting acceptor to listening and receive messages,
4871
// you can use `acceptor.MessageChan()` to load messages`
4972
func (a *Acceptor) Run(ctx context.Context) {
5073
// got exists max id from legacy
51-
utils.Logger.Info("process legacy data...")
74+
libs.Logger.Info("process legacy data...")
5275
maxID, err := a.Journal.LoadMaxID()
5376
if err != nil {
54-
utils.Logger.Panic("try to process legacy messages got error", zap.Error(err))
77+
libs.Logger.Panic("try to process legacy messages got error", zap.Error(err))
5578
}
5679

5780
couter, err := utils.NewParallelCounterFromN((maxID+1)%a.MaxRotateID, 10000, a.MaxRotateID)
@@ -60,7 +83,7 @@ func (a *Acceptor) Run(ctx context.Context) {
6083
}
6184

6285
for _, recv := range a.recvs {
63-
utils.Logger.Info("enable recv", zap.String("name", recv.GetName()))
86+
libs.Logger.Info("enable recv", zap.String("name", recv.GetName()))
6487
recv.SetAsyncOutChan(a.asyncOutChan)
6588
recv.SetSyncOutChan(a.syncOutChan)
6689
recv.SetMsgPool(a.MsgPool)

acceptorFilters/default_f.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package acceptorFilters
22

33
import (
44
"github.com/Laisky/go-fluentd/libs"
5-
utils "github.com/Laisky/go-utils"
65
"github.com/Laisky/zap"
76
)
87

@@ -19,7 +18,7 @@ type DefaultFilter struct {
1918
}
2019

2120
func NewDefaultFilter(cfg *DefaultFilterCfg) *DefaultFilter {
22-
utils.Logger.Info("NewDefaultFilter")
21+
libs.Logger.Info("NewDefaultFilter")
2322

2423
f := &DefaultFilter{
2524
BaseFilter: &BaseFilter{},
@@ -45,13 +44,13 @@ func (f *DefaultFilter) IsTagInConfigs(tag string) (ok bool) {
4544

4645
func (f *DefaultFilter) Filter(msg *libs.FluentMsg) *libs.FluentMsg {
4746
if f.RemoveEmptyTag && msg.Tag == "" {
48-
utils.Logger.Warn("discard log since empty tag", zap.String("tag", msg.Tag))
47+
libs.Logger.Warn("discard log since empty tag", zap.String("tag", msg.Tag))
4948
f.DiscardMsg(msg)
5049
return nil
5150
}
5251

5352
if f.RemoveUnsupportTag && !f.IsTagInConfigs(msg.Tag) {
54-
utils.Logger.Warn("discard log since unsupported tag", zap.String("tag", msg.Tag))
53+
libs.Logger.Warn("discard log since unsupported tag", zap.String("tag", msg.Tag))
5554
f.DiscardMsg(msg)
5655
return nil
5756
}

acceptorFilters/pipeline.go

Lines changed: 59 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,25 +28,24 @@ type AcceptorPipeline struct {
2828
}
2929

3030
func NewAcceptorPipeline(ctx context.Context, cfg *AcceptorPipelineCfg, filters ...AcceptorFilterItf) (a *AcceptorPipeline, err error) {
31-
utils.Logger.Info("NewAcceptorPipeline")
32-
if cfg.NFork < 1 {
33-
panic(fmt.Errorf("NFork should greater than 1, got: %v", cfg.NFork))
34-
}
35-
3631
a = &AcceptorPipeline{
3732
AcceptorPipelineCfg: cfg,
3833
filters: filters,
3934
reEnterChan: make(chan *libs.FluentMsg, cfg.ReEnterChanSize),
4035
counter: utils.NewCounter(),
4136
}
37+
if err := a.valid(); err != nil {
38+
libs.Logger.Panic("invalid cfg for acceptor pipeline")
39+
}
40+
4241
a.registerMonitor()
4342
for _, filter := range a.filters {
4443
filter.SetUpstream(a.reEnterChan)
4544
filter.SetMsgPool(a.MsgPool)
4645
}
4746

4847
if a.IsThrottle {
49-
utils.Logger.Info("enable acceptor throttle",
48+
libs.Logger.Info("enable acceptor throttle",
5049
zap.Int("max", a.ThrottleMax),
5150
zap.Int("n_perf_sec", a.ThrottleNPerSec))
5251
if a.throttle, err = utils.NewThrottleWithCtx(
@@ -59,9 +58,53 @@ func NewAcceptorPipeline(ctx context.Context, cfg *AcceptorPipelineCfg, filters
5958
}
6059
}
6160

61+
libs.Logger.Info("new acceptor pipeline",
62+
zap.Int("n_fork", a.NFork),
63+
zap.Int("out_buf_len", a.OutChanSize),
64+
zap.Int("reenter_chan_len", a.ReEnterChanSize),
65+
zap.Int("throttle_max", a.ThrottleMax),
66+
zap.Int("throttle_per_sec", a.ThrottleNPerSec),
67+
zap.Bool("is_throttle", a.IsThrottle),
68+
)
6269
return a, nil
6370
}
6471

72+
func (f *AcceptorPipeline) valid() error {
73+
if f.NFork <= 0 {
74+
f.NFork = 4
75+
libs.Logger.Info("reset n_fork", zap.Int("n_fork", f.NFork))
76+
}
77+
78+
if f.OutChanSize <= 0 {
79+
f.OutChanSize = 1000
80+
libs.Logger.Info("reset out_buf_len", zap.Int("out_buf_len", f.OutChanSize))
81+
}
82+
83+
if f.ReEnterChanSize <= 0 {
84+
f.ReEnterChanSize = 1000
85+
libs.Logger.Info("reset reenter_chan_len", zap.Int("reenter_chan_len", f.ReEnterChanSize))
86+
}
87+
88+
if f.NFork <= 0 {
89+
f.NFork = 4
90+
libs.Logger.Info("reset n_fork", zap.Int("n_fork", f.NFork))
91+
}
92+
93+
if f.IsThrottle {
94+
if f.ThrottleMax <= 0 {
95+
f.ThrottleMax = 10000
96+
libs.Logger.Info("reset throttle_max", zap.Int("throttle_max", f.ThrottleMax))
97+
}
98+
99+
if f.ThrottleNPerSec <= 0 {
100+
f.ThrottleNPerSec = 1000
101+
libs.Logger.Info("reset throttle_per_sec", zap.Int("throttle_per_sec", f.ThrottleNPerSec))
102+
}
103+
}
104+
105+
return nil
106+
}
107+
65108
func (f *AcceptorPipeline) registerMonitor() {
66109
monitor.AddMetric("acceptorPipeline", func() map[string]interface{} {
67110
metrics := map[string]interface{}{
@@ -88,7 +131,7 @@ func (f *AcceptorPipeline) Wrap(ctx context.Context, asyncInChan, syncInChan cha
88131
msg *libs.FluentMsg
89132
ok bool
90133
)
91-
defer utils.Logger.Info("quit acceptorPipeline asyncChan", zap.String("last_msg", fmt.Sprint(msg)))
134+
defer libs.Logger.Info("quit acceptorPipeline asyncChan", zap.String("last_msg", fmt.Sprint(msg)))
92135

93136
NEXT_ASYNC_MSG:
94137
for {
@@ -97,21 +140,21 @@ func (f *AcceptorPipeline) Wrap(ctx context.Context, asyncInChan, syncInChan cha
97140
return
98141
case msg, ok = <-f.reEnterChan: // CAUTION: do not put msg into reEnterChan forever
99142
if !ok {
100-
utils.Logger.Info("reEnterChan closed")
143+
libs.Logger.Info("reEnterChan closed")
101144
return
102145
}
103146
case msg, ok = <-asyncInChan:
104147
if !ok {
105-
utils.Logger.Info("asyncInChan closed")
148+
libs.Logger.Info("asyncInChan closed")
106149
return
107150
}
108151
}
109152
f.counter.Count()
110153

111-
// utils.Logger.Debug("AcceptorPipeline got msg")
154+
// libs.Logger.Debug("AcceptorPipeline got msg")
112155

113156
if f.IsThrottle && !f.throttle.Allow() {
114-
utils.Logger.Warn("discard msg by throttle", zap.String("tag", msg.Tag))
157+
libs.Logger.Warn("discard msg by throttle", zap.String("tag", msg.Tag))
115158
f.DiscardMsg(msg)
116159
continue
117160
}
@@ -129,7 +172,7 @@ func (f *AcceptorPipeline) Wrap(ctx context.Context, asyncInChan, syncInChan cha
129172
case outChan <- msg:
130173
case skipDumpChan <- msg: // baidu has low disk performance
131174
default:
132-
utils.Logger.Error("discard msg since disk & downstream are busy", zap.String("tag", msg.Tag))
175+
libs.Logger.Error("discard msg since disk & downstream are busy", zap.String("tag", msg.Tag))
133176
f.MsgPool.Put(msg)
134177
}
135178
}
@@ -143,7 +186,7 @@ func (f *AcceptorPipeline) Wrap(ctx context.Context, asyncInChan, syncInChan cha
143186
msg *libs.FluentMsg
144187
ok bool
145188
)
146-
defer utils.Logger.Info("quit acceptorPipeline syncChan", zap.String("last_msg", fmt.Sprint(msg)))
189+
defer libs.Logger.Info("quit acceptorPipeline syncChan", zap.String("last_msg", fmt.Sprint(msg)))
147190

148191
NEXT_SYNC_MSG:
149192
for {
@@ -152,15 +195,15 @@ func (f *AcceptorPipeline) Wrap(ctx context.Context, asyncInChan, syncInChan cha
152195
return
153196
case msg, ok = <-syncInChan:
154197
if !ok {
155-
utils.Logger.Info("syncInChan closed")
198+
libs.Logger.Info("syncInChan closed")
156199
return
157200
}
158201
}
159-
// utils.Logger.Debug("AcceptorPipeline got blockable msg")
202+
// libs.Logger.Debug("AcceptorPipeline got blockable msg")
160203
f.counter.Count()
161204

162205
if f.IsThrottle && !f.throttle.Allow() {
163-
utils.Logger.Warn("discard msg by throttle", zap.String("tag", msg.Tag))
206+
libs.Logger.Warn("discard msg by throttle", zap.String("tag", msg.Tag))
164207
f.DiscardMsg(msg)
165208
continue
166209
}

acceptorFilters/spark_f.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"regexp"
66

77
"github.com/Laisky/go-fluentd/libs"
8-
"github.com/Laisky/go-utils"
98
"github.com/Laisky/zap"
109
)
1110

@@ -22,7 +21,7 @@ type SparkFilter struct {
2221
}
2322

2423
func NewSparkFilter(cfg *SparkFilterCfg) *SparkFilter {
25-
utils.Logger.Info("NewSparkFilter",
24+
libs.Logger.Info("NewSparkFilter",
2625
zap.String("regex", cfg.IgnoreRegex.String()),
2726
zap.String("tag", cfg.Tag))
2827

@@ -52,7 +51,7 @@ func (f *SparkFilter) Filter(msg *libs.FluentMsg) *libs.FluentMsg {
5251
}
5352

5453
// discard some format
55-
// utils.Logger.Debug("ignore spark log",
54+
// libs.Logger.Debug("ignore spark log",
5655
// zap.String("tag", f.Tag),
5756
// zap.ByteString("log", msg.Message[f.MsgKey].([]byte)))
5857
if f.IgnoreRegex.Match(msg.Message[f.MsgKey].([]byte)) {

0 commit comments

Comments
 (0)