Skip to content

Commit c580f93

Browse files
supervisor Add option to run supervisor a fixed number of times (#620)
<!-- Copyright (C) 2020-2022 Arm Limited or its affiliates and Contributors. All rights reserved. SPDX-License-Identifier: Apache-2.0 --> ### Description <!-- Please add any detail or context that would be useful to a reviewer. --> Add option to run supervisor a fixed number of times ### Test Coverage <!-- Please put an `x` in the correct box e.g. `[x]` to indicate the testing coverage of this change. --> - [x] This change is covered by existing or additional automated tests. - [ ] Manual testing has been performed (and evidence provided) as automated testing was not feasible. - [ ] Additional tests are not required for this change (e.g. documentation update).
1 parent d14e12c commit c580f93

File tree

7 files changed

+225
-1
lines changed

7 files changed

+225
-1
lines changed

changes/20250529160336.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:sparkles: `supervisor` Add option to run supervisor a fixed number of times

changes/20250529161638.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:sparkles: `supervisor` Add option to run a function after the supervisor stops

changes/20250529175329.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:sparkles: `logs` Add support for a FIFO logger that discards logs ater reading them

utils/logs/fifo_logger.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package logs
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
"io"
7+
"log"
8+
"sync"
9+
)
10+
11+
type FIFOWriter struct {
12+
io.WriteCloser
13+
mu sync.RWMutex
14+
Logs bytes.Buffer
15+
}
16+
17+
func (w *FIFOWriter) Write(p []byte) (n int, err error) {
18+
w.mu.RLock()
19+
defer w.mu.RUnlock()
20+
w.Logs.Write(p)
21+
return
22+
}
23+
24+
func (w *FIFOWriter) Close() (err error) {
25+
w.mu.Lock()
26+
defer w.mu.Unlock()
27+
w.Logs.Reset()
28+
return
29+
}
30+
31+
func (w *FIFOWriter) Read() string {
32+
w.mu.Lock()
33+
defer w.mu.Unlock()
34+
n := w.Logs.Len()
35+
if n == 0 {
36+
return ""
37+
}
38+
bytes := w.Logs.Next(n)
39+
return string(bytes)
40+
}
41+
42+
type FIFOLoggers struct {
43+
GenericLoggers
44+
LogWriter FIFOWriter
45+
}
46+
47+
func (l *FIFOLoggers) Check() error {
48+
return l.GenericLoggers.Check()
49+
}
50+
51+
func (l *FIFOLoggers) Read() string {
52+
return l.LogWriter.Read()
53+
}
54+
55+
// Close closes the logger
56+
func (l *FIFOLoggers) Close() (err error) {
57+
err = l.LogWriter.Close()
58+
if err != nil {
59+
return
60+
}
61+
err = l.GenericLoggers.Close()
62+
return
63+
}
64+
65+
// NewFIFOLogger creates a logger to a bytes buffer.
66+
// All messages (whether they are output or error) are merged together.
67+
// Once messages have been accessed they are gone
68+
func NewFIFOLogger(loggerSource string) (loggers *FIFOLoggers, err error) {
69+
loggers = &FIFOLoggers{
70+
LogWriter: FIFOWriter{},
71+
}
72+
loggers.GenericLoggers = GenericLoggers{
73+
Output: log.New(&loggers.LogWriter, fmt.Sprintf("[%v] Output: ", loggerSource), log.LstdFlags),
74+
Error: log.New(&loggers.LogWriter, fmt.Sprintf("[%v] Error: ", loggerSource), log.LstdFlags),
75+
}
76+
return
77+
}
78+
79+
// NewPlainFIFOLogger creates a logger to a bytes buffer with no extra flag, prefix or tag, just the logged text.
80+
// All messages (whether they are output or error) are merged together.
81+
// Once messages have been accessed they are gone
82+
func NewPlainFIFOLogger() (loggers *FIFOLoggers, err error) {
83+
loggers = &FIFOLoggers{
84+
LogWriter: FIFOWriter{},
85+
}
86+
loggers.GenericLoggers = GenericLoggers{
87+
Output: log.New(&loggers.LogWriter, "", 0),
88+
Error: log.New(&loggers.LogWriter, "", 0),
89+
}
90+
return
91+
}

utils/logs/fifo_logger_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package logs
2+
3+
import (
4+
"strings"
5+
"testing"
6+
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func TestFIFOLogger(t *testing.T) {
11+
loggers, err := NewFIFOLogger("Test")
12+
require.NoError(t, err)
13+
testLog(t, loggers)
14+
loggers.LogError("Test err")
15+
loggers.Log("Test1")
16+
contents := loggers.Read()
17+
require.NotEmpty(t, contents)
18+
require.True(t, strings.Contains(contents, "Test err"))
19+
require.True(t, strings.Contains(contents, "Test1"))
20+
loggers.Log("Test2")
21+
contents = loggers.Read()
22+
require.NotEmpty(t, contents)
23+
require.False(t, strings.Contains(contents, "Test err"))
24+
require.False(t, strings.Contains(contents, "Test1"))
25+
require.True(t, strings.Contains(contents, "Test2"))
26+
contents = loggers.Read()
27+
require.Empty(t, contents)
28+
}
29+
30+
func TestPlainFIFOLogger(t *testing.T) {
31+
loggers, err := NewPlainFIFOLogger()
32+
require.NoError(t, err)
33+
testLog(t, loggers)
34+
loggers.LogError("Test err")
35+
loggers.Log("Test1")
36+
contents := loggers.Read()
37+
require.NotEmpty(t, contents)
38+
require.True(t, strings.Contains(contents, "Test err"))
39+
require.True(t, strings.Contains(contents, "Test1"))
40+
loggers.Log("Test2")
41+
contents = loggers.Read()
42+
require.NotEmpty(t, contents)
43+
require.False(t, strings.Contains(contents, "Test err"))
44+
require.False(t, strings.Contains(contents, "Test1"))
45+
require.True(t, strings.Contains(contents, "Test2"))
46+
contents = loggers.Read()
47+
require.Empty(t, contents)
48+
}

utils/subprocess/supervisor/supervisor.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/ARM-software/golang-utils/utils/commonerrors"
1111
"github.com/ARM-software/golang-utils/utils/parallelisation"
12+
"github.com/ARM-software/golang-utils/utils/safecast"
1213
"github.com/ARM-software/golang-utils/utils/subprocess"
1314
)
1415

@@ -19,8 +20,10 @@ type Supervisor struct {
1920
preStart func(context.Context) error
2021
postStart func(context.Context) error
2122
postStop func(context.Context, error) error
23+
postEnd func()
2224
haltingErrors []error
2325
restartDelay time.Duration
26+
count uint
2427
}
2528

2629
type SupervisorOption func(*Supervisor)
@@ -75,9 +78,29 @@ func WithRestartDelay(delay time.Duration) SupervisorOption {
7578
}
7679
}
7780

81+
// WithCount will run cause the supervisor to exit after 'count' executions.
82+
func WithCount[I safecast.INumber](count I) SupervisorOption {
83+
return func(s *Supervisor) {
84+
s.count = safecast.ToUint(count)
85+
}
86+
}
87+
88+
// WithPostEnd will run 'function' after the supervisor has stopped.
89+
// It does not take a context to ensure that it runs after a context has been cancelled.
90+
// It does not return an error as this could cause confusion with the other returned errors.
91+
func WithPostEnd(function func()) SupervisorOption {
92+
return func(s *Supervisor) {
93+
s.postEnd = function
94+
}
95+
}
96+
7897
// Run will run the supervisor and execute any of the command hooks. If it receives a halting error or the context is cancelled then it will exit
7998
func (s *Supervisor) Run(ctx context.Context) (err error) {
80-
for {
99+
if s.postEnd != nil {
100+
defer s.postEnd()
101+
}
102+
103+
for i := uint(0); s.count == 0 || i < s.count; i++ {
81104
err = parallelisation.DetermineContextError(ctx)
82105
if err != nil {
83106
return
@@ -141,4 +164,6 @@ func (s *Supervisor) Run(ctx context.Context) (err error) {
141164

142165
// restart process
143166
}
167+
168+
return
144169
}

utils/subprocess/supervisor/supervisor_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,4 +378,61 @@ func TestSupervisor(t *testing.T) {
378378
assert.NotEmpty(t, written)
379379
assert.Equal(t, string(written), "test\ntest123\n")
380380
})
381+
382+
t.Run("with count", func(t *testing.T) {
383+
if platform.IsWindows() {
384+
t.SkipNow()
385+
}
386+
387+
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
388+
defer cancel()
389+
390+
logger, err := logs.NewLogrLogger(logstest.NewTestLogger(t), "Test")
391+
require.NoError(t, err)
392+
393+
counter := atomic.Uint64{}
394+
assert.Zero(t, counter.Load())
395+
396+
cmd, err := subprocess.New(ctx, logger, "starting", "success", "failed", "echo", "123")
397+
require.NoError(t, err)
398+
399+
runner := NewSupervisor(func(ctx context.Context) (*subprocess.Subprocess, error) {
400+
return cmd, nil
401+
}, WithPostStop(func(_ context.Context, _ error) error {
402+
_ = counter.Inc()
403+
return nil
404+
}), WithCount(3))
405+
406+
err = runner.Run(ctx)
407+
assert.NoError(t, err)
408+
assert.EqualValues(t, 3, counter.Load())
409+
})
410+
411+
t.Run("with post end", func(t *testing.T) {
412+
if platform.IsWindows() {
413+
t.SkipNow()
414+
}
415+
416+
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
417+
defer cancel()
418+
419+
logger, err := logs.NewLogrLogger(logstest.NewTestLogger(t), "Test")
420+
require.NoError(t, err)
421+
422+
counter := atomic.Uint64{}
423+
assert.Zero(t, counter.Load())
424+
425+
cmd, err := subprocess.New(ctx, logger, "starting", "success", "failed", "echo", "123")
426+
require.NoError(t, err)
427+
428+
runner := NewSupervisor(func(ctx context.Context) (*subprocess.Subprocess, error) {
429+
return cmd, nil
430+
}, WithPostEnd(func() {
431+
_ = counter.Inc()
432+
}), WithCount(1))
433+
434+
err = runner.Run(ctx)
435+
assert.NoError(t, err)
436+
assert.EqualValues(t, 1, counter.Load())
437+
})
381438
}

0 commit comments

Comments
 (0)