Skip to content

Commit 6fe7c61

Browse files
gitforbitamelhusic
authored andcommitted
MEDIUM: watcher: fix race condition & plumbing stop for test
1 parent 2e1ec40 commit 6fe7c61

File tree

2 files changed

+33
-6
lines changed

2 files changed

+33
-6
lines changed

consul/watcher.go

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,9 @@ type Watcher struct {
6363
certCAPool *x509.CertPool
6464
leaf *certLeaf
6565

66-
update chan struct{}
67-
log Logger
66+
update chan struct{}
67+
shutdownCh chan struct{}
68+
log Logger
6869
}
6970

7071
// New builds a new watcher
@@ -73,10 +74,11 @@ func New(service string, consul *api.Client, log Logger) *Watcher {
7374
service: service,
7475
consul: consul,
7576

76-
C: make(chan Config),
77-
upstreams: make(map[string]*upstream),
78-
update: make(chan struct{}, 1),
79-
log: log,
77+
C: make(chan Config),
78+
upstreams: make(map[string]*upstream),
79+
update: make(chan struct{}, 1),
80+
shutdownCh: make(chan struct{}),
81+
log: log,
8082
}
8183
}
8284

@@ -189,9 +191,11 @@ func (w *Watcher) startUpstreamService(up api.Upstream, name string) {
189191
go func() {
190192
index := uint64(0)
191193
for {
194+
w.lock.Lock()
192195
if u.done {
193196
return
194197
}
198+
w.lock.Unlock()
195199
nodes, meta, err := w.consul.Health().Connect(up.DestinationName, "", true, &api.QueryOptions{
196200
Datacenter: up.Datacenter,
197201
WaitTime: 10 * time.Minute,
@@ -331,6 +335,9 @@ func (w *Watcher) watchLeaf() {
331335
w.ready.Done()
332336
first = false
333337
}
338+
if w.isStopped() {
339+
return
340+
}
334341
}
335342
}
336343

@@ -361,6 +368,9 @@ func (w *Watcher) watchService(service string, handler func(first bool, srv *api
361368
}
362369

363370
first = false
371+
if w.isStopped() {
372+
return
373+
}
364374
}
365375
}
366376

@@ -405,6 +415,9 @@ func (w *Watcher) watchCA() {
405415
w.ready.Done()
406416
first = false
407417
}
418+
if w.isStopped() {
419+
return
420+
}
408421
}
409422
}
410423

@@ -492,3 +505,16 @@ func (w *Watcher) notifyChanged() {
492505
default:
493506
}
494507
}
508+
509+
func (w *Watcher) Stop() {
510+
close(w.shutdownCh)
511+
}
512+
513+
func (w *Watcher) isStopped() bool {
514+
select {
515+
case <-w.shutdownCh:
516+
return true
517+
default:
518+
return false
519+
}
520+
}

utils_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ func startConnectService(t *testing.T, sd *lib.Shutdown, client *api.Client, reg
7070
errs <- err
7171
}
7272
}()
73+
watcher.Stop()
7374

7475
sourceHap := haproxy.New(client, watcher.C, haproxy.Options{
7576
EnableIntentions: true,

0 commit comments

Comments
 (0)