Skip to content

Commit 33dd17c

Browse files
committed
fix: trigger the sync job whenever the component is updated with new
targets
1 parent 17dd668 commit 33dd17c

File tree

2 files changed

+40
-21
lines changed

2 files changed

+40
-21
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ Main (unreleased)
4949

5050
- Schedule new path targets faster in `loki.source.file`. (@kalleep)
5151

52+
- `local.file_match` will now trigger it's sync job when the component is updated with new targets. (@kalleep)
53+
5254
### Bugfixes
5355

5456
- Stop `loki.source.kubernetes` discarding log lines with duplicate timestamps. (@ciaranj)

internal/component/local/file_match/file.go

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -37,20 +37,23 @@ var _ component.Component = (*Component)(nil)
3737
type Component struct {
3838
opts component.Options
3939

40-
mut sync.RWMutex
41-
args Arguments
42-
watches []watch
43-
watchDog *time.Ticker
40+
mut sync.Mutex
41+
args Arguments
42+
watches []watch
43+
watchDog *time.Ticker
44+
targetsChanged chan struct{}
4445
}
4546

4647
// New creates a new local.file_match component.
4748
func New(o component.Options, args Arguments) (*Component, error) {
4849
c := &Component{
4950
opts: o,
50-
mut: sync.RWMutex{},
51+
mut: sync.Mutex{},
5152
args: args,
5253
watches: make([]watch, 0),
5354
watchDog: time.NewTicker(args.SyncPeriod),
55+
// Buffered channel to avoid blocking
56+
targetsChanged: make(chan struct{}, 1),
5457
}
5558

5659
if err := c.Update(args); err != nil {
@@ -73,11 +76,16 @@ func (c *Component) Update(args component.Arguments) error {
7376
c.mut.Lock()
7477
defer c.mut.Unlock()
7578

79+
newArgs := args.(Arguments)
80+
7681
// Check to see if our ticker timer needs to be reset.
77-
if args.(Arguments).SyncPeriod != c.args.SyncPeriod {
78-
c.watchDog.Reset(c.args.SyncPeriod)
82+
if newArgs.SyncPeriod != c.args.SyncPeriod {
83+
c.watchDog.Reset(newArgs.SyncPeriod)
7984
}
80-
c.args = args.(Arguments)
85+
86+
c.args = newArgs
87+
88+
// Rebuild watches
8189
c.watches = c.watches[:0]
8290
for _, v := range c.args.PathTargets {
8391
c.watches = append(c.watches, watch{
@@ -87,34 +95,42 @@ func (c *Component) Update(args component.Arguments) error {
8795
})
8896
}
8997

98+
// Always trigger immediate check when Update is called
99+
select {
100+
case c.targetsChanged <- struct{}{}:
101+
default:
102+
}
103+
90104
return nil
91105
}
92106

93107
// Run satisfies the component interface.
94108
func (c *Component) Run(ctx context.Context) error {
95-
update := func() {
96-
c.mut.Lock()
97-
defer c.mut.Unlock()
98-
99-
paths := c.getWatchedFiles()
100-
// The component node checks to see if exports have actually changed.
101-
c.opts.OnStateChange(discovery.Exports{Targets: paths})
102-
}
103-
// Trigger initial check
104-
update()
105109
defer c.watchDog.Stop()
106110
for {
107111
select {
112+
case <-c.targetsChanged:
113+
// When we get a signal that we have new targets we will get all watched files and
114+
// reset the timer.
115+
c.mut.Lock()
116+
c.watchDog.Reset(c.args.SyncPeriod)
117+
targets := c.getWatchedFiles(true)
118+
c.mut.Unlock()
119+
c.opts.OnStateChange(discovery.Exports{Targets: targets})
108120
case <-c.watchDog.C:
109-
// This triggers a check for any new paths, along with pushing new targets.
110-
update()
121+
// If we have not received a signal that we have new targets watch job will periodically
122+
// get all files that we should watch.
123+
c.mut.Lock()
124+
targets := c.getWatchedFiles(false)
125+
c.mut.Unlock()
126+
c.opts.OnStateChange(discovery.Exports{Targets: targets})
111127
case <-ctx.Done():
112128
return nil
113129
}
114130
}
115131
}
116132

117-
func (c *Component) getWatchedFiles() []discovery.Target {
133+
func (c *Component) getWatchedFiles(targetsUpdated bool) []discovery.Target {
118134
paths := make([]discovery.Target, 0)
119135
// See if there is anything new we need to check.
120136
for _, w := range c.watches {
@@ -126,3 +142,4 @@ func (c *Component) getWatchedFiles() []discovery.Target {
126142
}
127143
return paths
128144
}
145+

0 commit comments

Comments
 (0)