From 33dd17ca4ae70dabd2ada324d9b62f992b2b2bd7 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Tue, 21 Oct 2025 14:50:49 +0200 Subject: [PATCH 1/3] fix: trigger the sync job whenever the component is updated with new targets --- CHANGELOG.md | 2 + internal/component/local/file_match/file.go | 59 +++++++++++++-------- 2 files changed, 40 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 57db465fc8..f6701eeab0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,8 @@ Main (unreleased) - Schedule new path targets faster in `loki.source.file`. (@kalleep) +- `local.file_match` will now trigger it's sync job when the component is updated with new targets. (@kalleep) + ### Bugfixes - Stop `loki.source.kubernetes` discarding log lines with duplicate timestamps. (@ciaranj) diff --git a/internal/component/local/file_match/file.go b/internal/component/local/file_match/file.go index 102f81beee..6a6c724133 100644 --- a/internal/component/local/file_match/file.go +++ b/internal/component/local/file_match/file.go @@ -37,20 +37,23 @@ var _ component.Component = (*Component)(nil) type Component struct { opts component.Options - mut sync.RWMutex - args Arguments - watches []watch - watchDog *time.Ticker + mut sync.Mutex + args Arguments + watches []watch + watchDog *time.Ticker + targetsChanged chan struct{} } // New creates a new local.file_match component. func New(o component.Options, args Arguments) (*Component, error) { c := &Component{ opts: o, - mut: sync.RWMutex{}, + mut: sync.Mutex{}, args: args, watches: make([]watch, 0), watchDog: time.NewTicker(args.SyncPeriod), + // Buffered channel to avoid blocking + targetsChanged: make(chan struct{}, 1), } if err := c.Update(args); err != nil { @@ -73,11 +76,16 @@ func (c *Component) Update(args component.Arguments) error { c.mut.Lock() defer c.mut.Unlock() + newArgs := args.(Arguments) + // Check to see if our ticker timer needs to be reset. - if args.(Arguments).SyncPeriod != c.args.SyncPeriod { - c.watchDog.Reset(c.args.SyncPeriod) + if newArgs.SyncPeriod != c.args.SyncPeriod { + c.watchDog.Reset(newArgs.SyncPeriod) } - c.args = args.(Arguments) + + c.args = newArgs + + // Rebuild watches c.watches = c.watches[:0] for _, v := range c.args.PathTargets { c.watches = append(c.watches, watch{ @@ -87,34 +95,42 @@ func (c *Component) Update(args component.Arguments) error { }) } + // Always trigger immediate check when Update is called + select { + case c.targetsChanged <- struct{}{}: + default: + } + return nil } // Run satisfies the component interface. func (c *Component) Run(ctx context.Context) error { - update := func() { - c.mut.Lock() - defer c.mut.Unlock() - - paths := c.getWatchedFiles() - // The component node checks to see if exports have actually changed. - c.opts.OnStateChange(discovery.Exports{Targets: paths}) - } - // Trigger initial check - update() defer c.watchDog.Stop() for { select { + case <-c.targetsChanged: + // When we get a signal that we have new targets we will get all watched files and + // reset the timer. + c.mut.Lock() + c.watchDog.Reset(c.args.SyncPeriod) + targets := c.getWatchedFiles(true) + c.mut.Unlock() + c.opts.OnStateChange(discovery.Exports{Targets: targets}) case <-c.watchDog.C: - // This triggers a check for any new paths, along with pushing new targets. - update() + // If we have not received a signal that we have new targets watch job will periodically + // get all files that we should watch. + c.mut.Lock() + targets := c.getWatchedFiles(false) + c.mut.Unlock() + c.opts.OnStateChange(discovery.Exports{Targets: targets}) case <-ctx.Done(): return nil } } } -func (c *Component) getWatchedFiles() []discovery.Target { +func (c *Component) getWatchedFiles(targetsUpdated bool) []discovery.Target { paths := make([]discovery.Target, 0) // See if there is anything new we need to check. for _, w := range c.watches { @@ -126,3 +142,4 @@ func (c *Component) getWatchedFiles() []discovery.Target { } return paths } + From 62658b36b2c662f0f1f02fb1ba3c572377321910 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Tue, 21 Oct 2025 15:06:41 +0200 Subject: [PATCH 2/3] remove debug code --- internal/component/local/file_match/file.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/internal/component/local/file_match/file.go b/internal/component/local/file_match/file.go index 6a6c724133..e115bf5015 100644 --- a/internal/component/local/file_match/file.go +++ b/internal/component/local/file_match/file.go @@ -114,14 +114,14 @@ func (c *Component) Run(ctx context.Context) error { // reset the timer. c.mut.Lock() c.watchDog.Reset(c.args.SyncPeriod) - targets := c.getWatchedFiles(true) + targets := c.getWatchedFiles() c.mut.Unlock() c.opts.OnStateChange(discovery.Exports{Targets: targets}) case <-c.watchDog.C: // If we have not received a signal that we have new targets watch job will periodically // get all files that we should watch. c.mut.Lock() - targets := c.getWatchedFiles(false) + targets := c.getWatchedFiles() c.mut.Unlock() c.opts.OnStateChange(discovery.Exports{Targets: targets}) case <-ctx.Done(): @@ -130,7 +130,7 @@ func (c *Component) Run(ctx context.Context) error { } } -func (c *Component) getWatchedFiles(targetsUpdated bool) []discovery.Target { +func (c *Component) getWatchedFiles() []discovery.Target { paths := make([]discovery.Target, 0) // See if there is anything new we need to check. for _, w := range c.watches { @@ -142,4 +142,3 @@ func (c *Component) getWatchedFiles(targetsUpdated bool) []discovery.Target { } return paths } - From 25824adca3493f9e7feb9707314fce602f40c0cc Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Tue, 21 Oct 2025 16:21:35 +0200 Subject: [PATCH 3/3] update changelog --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f6701eeab0..81c7d2d6fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,7 +49,6 @@ Main (unreleased) - Schedule new path targets faster in `loki.source.file`. (@kalleep) -- `local.file_match` will now trigger it's sync job when the component is updated with new targets. (@kalleep) ### Bugfixes @@ -65,6 +64,8 @@ Main (unreleased) - Fix `otelcol.exporter.splunkhec` arguments missing documented `otel_attrs_to_hec_metadata` block. (@dehaansa) +- `local.file_match` now publish targets faster whenever targets in arguments changes. (@kalleep) + v1.11.2 -----------------