diff --git a/CHANGELOG.md b/CHANGELOG.md index 57db465fc8..81c7d2d6fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ Main (unreleased) - Schedule new path targets faster in `loki.source.file`. (@kalleep) + ### Bugfixes - Stop `loki.source.kubernetes` discarding log lines with duplicate timestamps. (@ciaranj) @@ -63,6 +64,8 @@ Main (unreleased) - Fix `otelcol.exporter.splunkhec` arguments missing documented `otel_attrs_to_hec_metadata` block. (@dehaansa) +- `local.file_match` now publish targets faster whenever targets in arguments changes. (@kalleep) + v1.11.2 ----------------- diff --git a/internal/component/local/file_match/file.go b/internal/component/local/file_match/file.go index 102f81beee..e115bf5015 100644 --- a/internal/component/local/file_match/file.go +++ b/internal/component/local/file_match/file.go @@ -37,20 +37,23 @@ var _ component.Component = (*Component)(nil) type Component struct { opts component.Options - mut sync.RWMutex - args Arguments - watches []watch - watchDog *time.Ticker + mut sync.Mutex + args Arguments + watches []watch + watchDog *time.Ticker + targetsChanged chan struct{} } // New creates a new local.file_match component. func New(o component.Options, args Arguments) (*Component, error) { c := &Component{ opts: o, - mut: sync.RWMutex{}, + mut: sync.Mutex{}, args: args, watches: make([]watch, 0), watchDog: time.NewTicker(args.SyncPeriod), + // Buffered channel to avoid blocking + targetsChanged: make(chan struct{}, 1), } if err := c.Update(args); err != nil { @@ -73,11 +76,16 @@ func (c *Component) Update(args component.Arguments) error { c.mut.Lock() defer c.mut.Unlock() + newArgs := args.(Arguments) + // Check to see if our ticker timer needs to be reset. - if args.(Arguments).SyncPeriod != c.args.SyncPeriod { - c.watchDog.Reset(c.args.SyncPeriod) + if newArgs.SyncPeriod != c.args.SyncPeriod { + c.watchDog.Reset(newArgs.SyncPeriod) } - c.args = args.(Arguments) + + c.args = newArgs + + // Rebuild watches c.watches = c.watches[:0] for _, v := range c.args.PathTargets { c.watches = append(c.watches, watch{ @@ -87,27 +95,35 @@ func (c *Component) Update(args component.Arguments) error { }) } + // Always trigger immediate check when Update is called + select { + case c.targetsChanged <- struct{}{}: + default: + } + return nil } // Run satisfies the component interface. func (c *Component) Run(ctx context.Context) error { - update := func() { - c.mut.Lock() - defer c.mut.Unlock() - - paths := c.getWatchedFiles() - // The component node checks to see if exports have actually changed. - c.opts.OnStateChange(discovery.Exports{Targets: paths}) - } - // Trigger initial check - update() defer c.watchDog.Stop() for { select { + case <-c.targetsChanged: + // When we get a signal that we have new targets we will get all watched files and + // reset the timer. + c.mut.Lock() + c.watchDog.Reset(c.args.SyncPeriod) + targets := c.getWatchedFiles() + c.mut.Unlock() + c.opts.OnStateChange(discovery.Exports{Targets: targets}) case <-c.watchDog.C: - // This triggers a check for any new paths, along with pushing new targets. - update() + // If we have not received a signal that we have new targets watch job will periodically + // get all files that we should watch. + c.mut.Lock() + targets := c.getWatchedFiles() + c.mut.Unlock() + c.opts.OnStateChange(discovery.Exports{Targets: targets}) case <-ctx.Done(): return nil }