|
| 1 | +diff --git a/internal/component/loki/process/stages/extensions.go b/internal/component/loki/process/stages/extensions.go |
| 2 | +index 3c7a4262b..0b19655ab 100644 |
| 3 | +--- a/internal/component/loki/process/stages/extensions.go |
| 4 | ++++ b/internal/component/loki/process/stages/extensions.go |
| 5 | +@@ -3,6 +3,7 @@ package stages |
| 6 | + import ( |
| 7 | + "fmt" |
| 8 | + "strings" |
| 9 | ++ "time" |
| 10 | + |
| 11 | + "github.com/go-kit/log" |
| 12 | + "github.com/grafana/alloy/internal/featuregate" |
| 13 | +@@ -23,9 +24,10 @@ type DockerConfig struct{} |
| 14 | + // CRIConfig is an empty struct that is used to enable a pre-defined pipeline |
| 15 | + // for decoding entries that are using the CRI logging format. |
| 16 | + type CRIConfig struct { |
| 17 | +- MaxPartialLines int `alloy:"max_partial_lines,attr,optional"` |
| 18 | +- MaxPartialLineSize uint64 `alloy:"max_partial_line_size,attr,optional"` |
| 19 | +- MaxPartialLineSizeTruncate bool `alloy:"max_partial_line_size_truncate,attr,optional"` |
| 20 | ++ MaxPartialLines int `alloy:"max_partial_lines,attr,optional"` |
| 21 | ++ MaxPartialLineSize uint64 `alloy:"max_partial_line_size,attr,optional"` |
| 22 | ++ MaxPartialLineSizeTruncate bool `alloy:"max_partial_line_size_truncate,attr,optional"` |
| 23 | ++ MaxPartialLineAge time.Duration `alloy:"max_partial_line_age,attr,optional"` |
| 24 | + } |
| 25 | + |
| 26 | + var ( |
| 27 | +@@ -38,6 +40,7 @@ var DefaultCRIConfig = CRIConfig{ |
| 28 | + MaxPartialLines: 100, |
| 29 | + MaxPartialLineSize: 0, |
| 30 | + MaxPartialLineSizeTruncate: false, |
| 31 | ++ MaxPartialLineAge: time.Minute, |
| 32 | + } |
| 33 | + |
| 34 | + // SetToDefault implements syntax.Defaulter. |
| 35 | +@@ -50,6 +53,9 @@ func (args *CRIConfig) Validate() error { |
| 36 | + if args.MaxPartialLines <= 0 { |
| 37 | + return fmt.Errorf("max_partial_lines must be greater than 0") |
| 38 | + } |
| 39 | ++ if args.MaxPartialLineAge <= time.Duration(0) { |
| 40 | ++ return fmt.Errorf("max_partial_line_age must be greater than 0") |
| 41 | ++ } |
| 42 | + |
| 43 | + return nil |
| 44 | + } |
| 45 | +@@ -110,7 +116,7 @@ func (*cri) Cleanup() { |
| 46 | + func (c *cri) Run(entry chan Entry) chan Entry { |
| 47 | + entry = c.base.Run(entry) |
| 48 | + |
| 49 | +- in := RunWithSkipOrSendMany(entry, func(e Entry) ([]Entry, bool) { |
| 50 | ++ in := RunWithSkipOrSendManyWithTick(entry, func(e Entry) ([]Entry, bool) { |
| 51 | + fingerprint := e.Labels.Fingerprint() |
| 52 | + |
| 53 | + // We received partial-line (tag: "P") |
| 54 | +@@ -157,6 +163,23 @@ func (c *cri) Run(entry chan Entry) chan Entry { |
| 55 | + delete(c.partialLines, fingerprint) |
| 56 | + } |
| 57 | + return []Entry{e}, false |
| 58 | ++ }, 10*time.Second, func() []Entry { |
| 59 | ++ // Send partial lines which are left unsent for a while. |
| 60 | ++ threshold := time.Now().Add(-c.cfg.MaxPartialLineAge) |
| 61 | ++ |
| 62 | ++ entries := make([]Entry, 0) |
| 63 | ++ fingerprints := make([]model.Fingerprint, 0) |
| 64 | ++ for k, v := range c.partialLines { |
| 65 | ++ if v.Timestamp.Before(threshold) { |
| 66 | ++ level.Warn(c.base.logger).Log("msg", "cri stage: flushing partial line due to max age", "labels", v.Labels) |
| 67 | ++ entries = append(entries, v) |
| 68 | ++ fingerprints = append(fingerprints, k) |
| 69 | ++ } |
| 70 | ++ } |
| 71 | ++ for _, fp := range fingerprints { |
| 72 | ++ delete(c.partialLines, fp) |
| 73 | ++ } |
| 74 | ++ return entries |
| 75 | + }) |
| 76 | + |
| 77 | + return in |
| 78 | +diff --git a/internal/component/loki/process/stages/pipeline.go b/internal/component/loki/process/stages/pipeline.go |
| 79 | +index f4404941c..88ec9d600 100644 |
| 80 | +--- a/internal/component/loki/process/stages/pipeline.go |
| 81 | ++++ b/internal/component/loki/process/stages/pipeline.go |
| 82 | +@@ -4,6 +4,7 @@ import ( |
| 83 | + "context" |
| 84 | + "fmt" |
| 85 | + "sync" |
| 86 | ++ "time" |
| 87 | + |
| 88 | + "github.com/go-kit/log" |
| 89 | + "github.com/prometheus/client_golang/prometheus" |
| 90 | +@@ -108,6 +109,38 @@ func RunWithSkipOrSendMany(input chan Entry, process func(e Entry) ([]Entry, boo |
| 91 | + return out |
| 92 | + } |
| 93 | + |
| 94 | ++// RunWithSkipOrSendManyWithTick same as RunWithSkipOrSendMany, except it can run `tick` function periodically. |
| 95 | ++func RunWithSkipOrSendManyWithTick(input chan Entry, process func(e Entry) ([]Entry, bool), interval time.Duration, tick func() []Entry) chan Entry { |
| 96 | ++ out := make(chan Entry) |
| 97 | ++ go func() { |
| 98 | ++ defer close(out) |
| 99 | ++ ticker := time.NewTicker(interval) |
| 100 | ++ defer ticker.Stop() |
| 101 | ++ for { |
| 102 | ++ select { |
| 103 | ++ case e, ok := <-input: |
| 104 | ++ if !ok { |
| 105 | ++ return |
| 106 | ++ } |
| 107 | ++ results, skip := process(e) |
| 108 | ++ if skip { |
| 109 | ++ continue |
| 110 | ++ } |
| 111 | ++ for _, result := range results { |
| 112 | ++ out <- result |
| 113 | ++ } |
| 114 | ++ case <-ticker.C: |
| 115 | ++ results := tick() |
| 116 | ++ for _, result := range results { |
| 117 | ++ out <- result |
| 118 | ++ } |
| 119 | ++ } |
| 120 | ++ } |
| 121 | ++ }() |
| 122 | ++ |
| 123 | ++ return out |
| 124 | ++} |
| 125 | ++ |
| 126 | + // Run implements Stage |
| 127 | + func (p *Pipeline) Run(in chan Entry) chan Entry { |
| 128 | + in = RunWith(in, func(e Entry) Entry { |
0 commit comments