Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Main (unreleased)

- Schedule new path targets faster in `loki.source.file`. (@kalleep)


### Bugfixes

- Stop `loki.source.kubernetes` discarding log lines with duplicate timestamps. (@ciaranj)
Expand All @@ -63,6 +64,8 @@ Main (unreleased)

- Fix `otelcol.exporter.splunkhec` arguments missing documented `otel_attrs_to_hec_metadata` block. (@dehaansa)

- `local.file_match` now publish targets faster whenever targets in arguments changes. (@kalleep)

v1.11.2
-----------------

Expand Down
56 changes: 36 additions & 20 deletions internal/component/local/file_match/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,23 @@ var _ component.Component = (*Component)(nil)
type Component struct {
opts component.Options

mut sync.RWMutex
args Arguments
watches []watch
watchDog *time.Ticker
mut sync.Mutex
args Arguments
watches []watch
watchDog *time.Ticker
targetsChanged chan struct{}
}

// New creates a new local.file_match component.
func New(o component.Options, args Arguments) (*Component, error) {
c := &Component{
opts: o,
mut: sync.RWMutex{},
mut: sync.Mutex{},
args: args,
watches: make([]watch, 0),
watchDog: time.NewTicker(args.SyncPeriod),
// Buffered channel to avoid blocking
targetsChanged: make(chan struct{}, 1),
}

if err := c.Update(args); err != nil {
Expand All @@ -73,11 +76,16 @@ func (c *Component) Update(args component.Arguments) error {
c.mut.Lock()
defer c.mut.Unlock()

newArgs := args.(Arguments)

// Check to see if our ticker timer needs to be reset.
if args.(Arguments).SyncPeriod != c.args.SyncPeriod {
c.watchDog.Reset(c.args.SyncPeriod)
if newArgs.SyncPeriod != c.args.SyncPeriod {
c.watchDog.Reset(newArgs.SyncPeriod)
}
c.args = args.(Arguments)

c.args = newArgs

// Rebuild watches
c.watches = c.watches[:0]
for _, v := range c.args.PathTargets {
c.watches = append(c.watches, watch{
Expand All @@ -87,27 +95,35 @@ func (c *Component) Update(args component.Arguments) error {
})
}

// Always trigger immediate check when Update is called
select {
case c.targetsChanged <- struct{}{}:
default:
}

return nil
}

// Run satisfies the component interface.
func (c *Component) Run(ctx context.Context) error {
update := func() {
c.mut.Lock()
defer c.mut.Unlock()

paths := c.getWatchedFiles()
// The component node checks to see if exports have actually changed.
c.opts.OnStateChange(discovery.Exports{Targets: paths})
}
// Trigger initial check
update()
defer c.watchDog.Stop()
for {
select {
case <-c.targetsChanged:
// When we get a signal that we have new targets we will get all watched files and
// reset the timer.
c.mut.Lock()
c.watchDog.Reset(c.args.SyncPeriod)
targets := c.getWatchedFiles()
c.mut.Unlock()
c.opts.OnStateChange(discovery.Exports{Targets: targets})
case <-c.watchDog.C:
// This triggers a check for any new paths, along with pushing new targets.
update()
// If we have not received a signal that we have new targets watch job will periodically
// get all files that we should watch.
c.mut.Lock()
targets := c.getWatchedFiles()
c.mut.Unlock()
c.opts.OnStateChange(discovery.Exports{Targets: targets})
case <-ctx.Done():
return nil
}
Expand Down
Loading