Skip to content

Commit 5d7ca52

Browse files
committed
Merge branch 'feature/test' into develop
2 parents 0718c04 + fba84ae commit 5d7ca52

File tree

11 files changed

+127
-60
lines changed

11 files changed

+127
-60
lines changed

acceptor.go

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,31 +28,43 @@ type Acceptor struct {
2828

2929
// NewAcceptor create new Acceptor
3030
func NewAcceptor(cfg *AcceptorCfg, recvs ...recvs.AcceptorRecvItf) *Acceptor {
31-
libs.Logger.Info("create Acceptor")
32-
33-
if cfg.MaxRotateID == 0 {
34-
cfg.MaxRotateID = 372036854775807
35-
libs.Logger.Info("reset MaxRotateID", zap.Int("MaxRotateID", 372036854775807))
36-
} else if cfg.MaxRotateID < 100 {
37-
libs.Logger.Error("MaxRotateID should not too small", zap.Int64("rotate", cfg.MaxRotateID))
38-
} else if cfg.MaxRotateID < 1000000 {
39-
libs.Logger.Warn("MaxRotateID should not too small", zap.Int64("rotate", cfg.MaxRotateID))
40-
}
41-
if cfg.SyncOutChanSize == 0 {
42-
cfg.SyncOutChanSize = 10000
43-
libs.Logger.Info("reset SyncOutChanSize", zap.Int("SyncOutChanSize", 10000))
44-
}
45-
if cfg.AsyncOutChanSize == 0 {
46-
cfg.AsyncOutChanSize = 10000
47-
libs.Logger.Info("reset AsyncOutChanSize", zap.Int("AsyncOutChanSize", 10000))
48-
}
49-
50-
return &Acceptor{
31+
a := &Acceptor{
5132
AcceptorCfg: cfg,
5233
syncOutChan: make(chan *libs.FluentMsg, cfg.SyncOutChanSize),
5334
asyncOutChan: make(chan *libs.FluentMsg, cfg.AsyncOutChanSize),
5435
recvs: recvs,
5536
}
37+
if err := a.valid(); err != nil {
38+
libs.Logger.Panic("new acceptor", zap.Error(err))
39+
}
40+
41+
libs.Logger.Info("create acceptor",
42+
zap.Int64("max_rotate_id", a.MaxRotateID),
43+
zap.Int("sync_out_chan_size", a.SyncOutChanSize),
44+
zap.Int("async_out_chan_size", a.AsyncOutChanSize),
45+
)
46+
return a
47+
}
48+
49+
func (a *Acceptor) valid() error {
50+
if a.MaxRotateID == 0 {
51+
a.MaxRotateID = 372036854775807
52+
libs.Logger.Info("reset max_rotate_id", zap.Int64("max_rotate_id", a.MaxRotateID))
53+
} else if a.MaxRotateID < 1000000 {
54+
libs.Logger.Warn("max_rotate_id should not too small", zap.Int64("max_rotate_id", a.MaxRotateID))
55+
}
56+
57+
if a.SyncOutChanSize == 0 {
58+
a.SyncOutChanSize = 10000
59+
libs.Logger.Info("reset sync_out_chan_size", zap.Int("sync_out_chan_size", a.SyncOutChanSize))
60+
}
61+
62+
if a.AsyncOutChanSize == 0 {
63+
a.AsyncOutChanSize = 10000
64+
libs.Logger.Info("reset async_out_chan_size", zap.Int("async_out_chan_size", a.AsyncOutChanSize))
65+
}
66+
67+
return nil
5668
}
5769

5870
// Run starting acceptor to listening and receive messages,

dispacher.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ type Dispatcher struct {
3030

3131
// NewDispatcher create new Dispatcher
3232
func NewDispatcher(cfg *DispatcherCfg) *Dispatcher {
33-
libs.Logger.Info("create Dispatcher")
34-
3533
d := &Dispatcher{
3634
DispatcherCfg: cfg,
3735
outChan: make(chan *libs.FluentMsg, cfg.OutChanSize),
@@ -44,6 +42,10 @@ func NewDispatcher(cfg *DispatcherCfg) *Dispatcher {
4442
libs.Logger.Panic("config invalid", zap.Error(err))
4543
}
4644

45+
libs.Logger.Info("create Dispatcher",
46+
zap.Int("n_fork", d.NFork),
47+
zap.Int("out_chan_size", d.OutChanSize),
48+
)
4749
return d
4850
}
4951

@@ -152,6 +154,10 @@ func (d *Dispatcher) registerMonitor() {
152154
metrics := map[string]interface{}{
153155
"msgPerSec": d.counter.GetSpeed(),
154156
"msgTotal": d.counter.Get(),
157+
"config": map[string]interface{}{
158+
"n_fork": d.NFork,
159+
"out_chan_size": d.OutChanSize,
160+
},
155161
}
156162
d.tag2Counter.Range(func(tagi interface{}, ci interface{}) bool {
157163
metrics[tagi.(string)+".MsgPerSec"] = ci.(*utils.Counter).GetSpeed()

docs/example/app/app.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def main():
4444
while 1:
4545
cnt += 1
4646
print(produce_log())
47-
time.sleep(0.01)
47+
time.sleep(0.1)
4848
if time.time() - t > 10:
4949
t = time.time()
5050
print(produce_log() + f"speed: {cnt/10}/s")

docs/example/docker-compose.yml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ services:
1010
driver: "fluentd"
1111
options:
1212
tag: test.sit
13-
fluentd-address: "127.0.0.1:24225"
13+
fluentd-address: "127.0.0.1:25225"
1414
fluentd-async-connect: "true"
1515
mode: non-blocking
1616
max-buffer-size: 1m
@@ -23,10 +23,11 @@ services:
2323
image: ppcelery/go-fluentd:test
2424
restart: "on-failure"
2525
ports:
26-
- 8080:8080
27-
- 24225:24225
26+
- 28080:8080
27+
- 25225:24225
2828
volumes:
2929
- ./go-fluentd/settings.yml:/etc/go-fluentd/settings.yml:ro
30+
- ./data:/data
3031
command: --config=/etc/go-fluentd/settings.yml
3132
--env=sit
3233
--addr=0.0.0.0:8080

docs/example/go-fluentd/settings.yml

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ consts:
1212
settings:
1313
acceptor:
1414
recvs:
15+
plugins:
1516
# 监听 DC/OS fluentd log-driver 的日志
1617
fluentd:
1718
type: fluentd
@@ -31,7 +32,7 @@ settings:
3132
log_level: info
3233
is_commit: true
3334
journal:
34-
buf_dir_path: ./data
35+
buf_dir_path: /data
3536
is_compress: true
3637
acceptor_filters:
3738
plugins:
@@ -54,17 +55,12 @@ settings:
5455
type: parser
5556
tags:
5657
- test
57-
msg_key: log
5858
# 2018-03-06 16:56:22.514 | mscparea | INFO | http-nio-8080-exec-1 | com.google.cloud.cp.core.service.impl.CPBusiness.reflectAdapterRequest | 84: test
5959
pattern: (?ms)^(?P<time>.{23}) {0,}\| {0,}(?P<app>[^\|]+) {0,}\| {0,}(?P<level>[^\|]+) {0,}\| {0,}(?P<thread>[^\|]+) {0,}\| {0,}(?P<class>[^ ]+) {0,}\| {0,}(?P<line>\d+) {0,}([\|:] {0,}(?P<args>\{.*\}))?([\|:] {0,}(?P<message>.*))?
6060
is_remove_orig_log: true
6161
must_include: app
6262
# ⬇⬇ time
63-
time_key: time
6463
time_format: "2006-01-02 15:04:05.000 -0700"
65-
new_time_format: "2006-01-02T15:04:05.000000Z"
66-
reserved_time_key: false
67-
new_time_key: "@timestamp"
6864
append_time_zone:
6965
sit: "+0800"
7066
perf: "+0800"

docs/settings/tiny_settings.yml

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ consts:
1212
settings:
1313
acceptor:
1414
recvs:
15+
plugins:
1516
# 监听 DC/OS fluentd log-driver 的日志
1617
fluentd:
1718
type: fluentd
@@ -54,17 +55,12 @@ settings:
5455
type: parser
5556
tags:
5657
- test
57-
msg_key: log
5858
# 2018-03-06 16:56:22.514 | mscparea | INFO | http-nio-8080-exec-1 | com.google.cloud.cp.core.service.impl.CPBusiness.reflectAdapterRequest | 84: test
5959
pattern: (?ms)^(?P<time>.{23}) {0,}\| {0,}(?P<app>[^\|]+) {0,}\| {0,}(?P<level>[^\|]+) {0,}\| {0,}(?P<thread>[^\|]+) {0,}\| {0,}(?P<class>[^ ]+) {0,}\| {0,}(?P<line>\d+) {0,}([\|:] {0,}(?P<args>\{.*\}))?([\|:] {0,}(?P<message>.*))?
6060
is_remove_orig_log: true
6161
must_include: app
6262
# ⬇⬇ time
63-
time_key: time
6463
time_format: "2006-01-02 15:04:05.000 -0700"
65-
new_time_format: "2006-01-02T15:04:05.000000Z"
66-
reserved_time_key: false
67-
new_time_key: "@timestamp"
6864
append_time_zone:
6965
sit: "+0800"
7066
perf: "+0800"

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ go 1.12
44

55
require (
66
github.com/Laisky/gin-middlewares v1.1.1
7-
github.com/Laisky/go-journal v1.1.3
7+
github.com/Laisky/go-journal v1.1.5
88
github.com/Laisky/go-kafka v1.0.1
99
github.com/Laisky/go-syslog v2.3.3+incompatible
1010
github.com/Laisky/go-utils v1.12.9
1111
github.com/Laisky/zap v1.12.2
12-
github.com/Shopify/sarama v1.26.3
12+
github.com/Shopify/sarama v1.26.4
1313
github.com/cespare/xxhash v1.1.0
1414
github.com/gin-contrib/pprof v1.3.0
1515
github.com/gin-gonic/gin v1.6.3

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ github.com/Laisky/gin-middlewares v1.1.1 h1:EVFwX94bMVSeJ4a6TXgT6BXvhqyn6j0kKQhE
55
github.com/Laisky/gin-middlewares v1.1.1/go.mod h1:IJxjhBGdN+o6BfGME2qbxZl54i5jQmcnwMh6QUd89T8=
66
github.com/Laisky/go-chaining v0.0.0-20180507092046-43dcdc5a21be h1:7Rxhm6IjOtDAyj8eScOFntevwzkWhx94zi48lxo4m4w=
77
github.com/Laisky/go-chaining v0.0.0-20180507092046-43dcdc5a21be/go.mod h1:1mdzaETo0kjvCQICPSePsoaatJN4l7JvEA1200lyevo=
8-
github.com/Laisky/go-journal v1.1.3 h1:6AMsDuhrWcit6oHkVhcvyJmitt4fFicP4UpvcT8ALlI=
9-
github.com/Laisky/go-journal v1.1.3/go.mod h1:6302T+Uo0+xYp5O9Z+GalHnl+R3hmRyw0aa11IqNX90=
8+
github.com/Laisky/go-journal v1.1.5 h1:6IiDPBrUSiM7TZAHRa7tIoHVBnkPGW2dFYj3q3XryZk=
9+
github.com/Laisky/go-journal v1.1.5/go.mod h1:6302T+Uo0+xYp5O9Z+GalHnl+R3hmRyw0aa11IqNX90=
1010
github.com/Laisky/go-kafka v1.0.1 h1:DNMBLshSMUUedH9nmBhGrpEf0FLGIAQfPlbQBkFcCfw=
1111
github.com/Laisky/go-kafka v1.0.1/go.mod h1:AksqzI2+yDSX6MtQikP6aTg9om52pSdropjst0oqeZE=
1212
github.com/Laisky/go-syslog v2.3.3+incompatible h1:TSHhP3iadAPDzC5efyYLPnGkv2pvUtuUInm7poVRkFA=
@@ -25,8 +25,8 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
2525
github.com/RoaringBitmap/roaring v0.4.23 h1:gpyfd12QohbqhFO4NVDUdoPOCXsyahYRQhINmlHxKeo=
2626
github.com/RoaringBitmap/roaring v0.4.23/go.mod h1:D0gp8kJQgE1A4LQ5wFLggQEyvDi06Mq5mKs52e1TwOo=
2727
github.com/Shopify/sarama v1.26.1/go.mod h1:NbSGBSSndYaIhRcBtY9V0U7AyH+x71bG668AuWys/yU=
28-
github.com/Shopify/sarama v1.26.3 h1:wSN3FpDXLe3e2z47OzGii5VAK693oVkyHFwh240jWjg=
29-
github.com/Shopify/sarama v1.26.3/go.mod h1:NbSGBSSndYaIhRcBtY9V0U7AyH+x71bG668AuWys/yU=
28+
github.com/Shopify/sarama v1.26.4 h1:+17TxUq/PJEAfZAll0T7XJjSgQWCpaQSoki/x5yN8o8=
29+
github.com/Shopify/sarama v1.26.4/go.mod h1:NbSGBSSndYaIhRcBtY9V0U7AyH+x71bG668AuWys/yU=
3030
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
3131
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
3232
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=

journal.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -571,14 +571,17 @@ func (j *Journal) DumpMsgFlow(ctx context.Context, msgPool *sync.Pool, dumpChan,
571571
default:
572572
select {
573573
case jji.(chan *libs.FluentMsg) <- msg:
574-
case j.outChan <- msg:
575-
libs.Logger.Warn("skip dump since journal is busy", zap.String("tag", msg.Tag))
576574
default:
577-
libs.Logger.Error("discard log since of journal & downstream busy",
578-
zap.String("tag", msg.Tag),
579-
zap.String("msg", fmt.Sprint(msg)),
580-
)
581-
j.MsgPool.Put(msg)
575+
select {
576+
case j.outChan <- msg:
577+
libs.Logger.Warn("skip dump since journal is busy", zap.String("tag", msg.Tag))
578+
default:
579+
libs.Logger.Error("discard log since of journal & downstream busy",
580+
zap.String("tag", msg.Tag),
581+
zap.String("msg", fmt.Sprint(msg)),
582+
)
583+
j.MsgPool.Put(msg)
584+
}
582585
}
583586
}
584587
}
@@ -641,7 +644,18 @@ func (j *Journal) startCommitRunner(ctx context.Context) {
641644

642645
func (j *Journal) registerMonitor() {
643646
monitor.AddMetric("journal", func() map[string]interface{} {
644-
result := map[string]interface{}{}
647+
result := map[string]interface{}{
648+
"config": map[string]interface{}{
649+
"compress": j.IsCompress,
650+
"buf_dir_path": j.BufDirPath,
651+
"buf_file_bytes": j.BufSizeBytes,
652+
"gc_inteval_sec": j.GCIntervalSec / time.Second,
653+
"journal_out_chan_len": j.JournalOutChanLen,
654+
"commit_id_chan_len": j.CommitIDChanLen,
655+
"child_data_chan_len": j.ChildJournalDataInchanLen,
656+
"child_id_chan_len": j.ChildJournalIDInchanLen,
657+
},
658+
}
645659
j.tag2JMap.Range(func(k, v interface{}) bool {
646660
result[k.(string)+".journal"] = v.(*journal.Journal).GetMetric()
647661
return true

producer.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,15 +94,15 @@ func NewProducer(cfg *ProducerCfg, senders ...senders.SenderItf) (*Producer, err
9494
}
9595

9696
libs.Logger.Info("new producer",
97-
zap.Int("nfork", 1),
97+
zap.Int("nfork", p.NFork),
9898
zap.Int("discard_chan_size", p.DiscardChanSize),
9999
)
100100
return p, nil
101101
}
102102

103103
func (p *Producer) valid() error {
104104
if p.NFork <= 0 {
105-
p.NFork = 1
105+
p.NFork = 4
106106
libs.Logger.Info("reset nfork", zap.Int("nfork", 1))
107107
}
108108

@@ -118,6 +118,10 @@ func (p *Producer) valid() error {
118118
func (p *Producer) registerMonitor() {
119119
monitor.AddMetric("producer", func() map[string]interface{} {
120120
metrics := map[string]interface{}{
121+
"config": map[string]interface{}{
122+
"nfork": p.NFork,
123+
"discard_chan_size": p.DiscardChanSize,
124+
},
121125
"msgPerSec": p.counter.GetSpeed(),
122126
"msgTotal": p.counter.Get(),
123127
}

tagFilters/parser_f.go

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -220,27 +220,65 @@ type ParserFact struct {
220220
}
221221

222222
func NewParserFact(cfg *ParserFactCfg) *ParserFact {
223-
libs.Logger.Info("create new connectorfactory")
224-
225-
if cfg.NFork < 1 {
226-
libs.Logger.Warn("nfork should bigger than 1")
227-
cfg.NFork = 1
228-
}
229-
230223
cf := &ParserFact{
231224
BaseTagFilterFactory: &BaseTagFilterFactory{},
232225
ParserFactCfg: cfg,
233226
}
227+
if err := cf.valid(); err != nil {
228+
libs.Logger.Panic("new parser", zap.Error(err))
229+
}
234230

235231
cf.tagsset = map[string]struct{}{}
236232
for _, tag := range cf.Tags {
237233
libs.Logger.Info("Parser factory add tag", zap.String("tag", tag+"."+cf.Env))
238234
cf.tagsset[tag+"."+cf.Env] = struct{}{}
239235
}
240236

237+
libs.Logger.Info("new parser",
238+
zap.Int("n_fork", cf.NFork),
239+
zap.String("msg_key", cf.MsgKey),
240+
zap.String("time_key", cf.TimeKey),
241+
zap.String("new_time_format", cf.NewTimeFormat),
242+
zap.String("new_time_key", cf.NewTimeKey),
243+
zap.String("msg_key", cf.MsgKey),
244+
)
241245
return cf
242246
}
243247

248+
func (cf *ParserFact) valid() error {
249+
if cf.NFork < 1 {
250+
cf.NFork = 4
251+
libs.Logger.Info("reset n_fork", zap.Int("n_fork", cf.NFork))
252+
}
253+
254+
if cf.MsgKey == "" {
255+
cf.MsgKey = "log"
256+
libs.Logger.Info("reset msg_key", zap.String("msg_key", cf.MsgKey))
257+
}
258+
259+
if cf.TimeKey == "" {
260+
cf.TimeKey = "time"
261+
libs.Logger.Info("reset time_key", zap.String("time_key", cf.TimeKey))
262+
}
263+
264+
if cf.NewTimeFormat == "" {
265+
cf.NewTimeFormat = "2006-01-02T15:04:05.000000Z"
266+
libs.Logger.Info("reset new_time_format", zap.String("new_time_format", cf.NewTimeFormat))
267+
}
268+
269+
if cf.NewTimeKey == "" {
270+
cf.NewTimeKey = "@timestamp"
271+
libs.Logger.Info("reset new_time_key", zap.String("new_time_key", cf.NewTimeKey))
272+
}
273+
274+
if cf.MsgKey == "" {
275+
cf.MsgKey = "log"
276+
libs.Logger.Info("reset msg_key", zap.String("msg_key", cf.MsgKey))
277+
}
278+
279+
return nil
280+
}
281+
244282
func (cf *ParserFact) GetName() string {
245283
return cf.Name + "-parser"
246284
}

0 commit comments

Comments
 (0)