Skip to content

Commit a61ce24

Browse files
logs Add support for reading line by line in FIFO reader (#621)
<!-- Copyright (C) 2020-2022 Arm Limited or its affiliates and Contributors. All rights reserved. SPDX-License-Identifier: Apache-2.0 --> ### Description <!-- Please add any detail or context that would be useful to a reviewer. --> Add support for reading line by line in FIFO reader ### Test Coverage <!-- Please put an `x` in the correct box e.g. `[x]` to indicate the testing coverage of this change. --> - [x] This change is covered by existing or additional automated tests. - [ ] Manual testing has been performed (and evidence provided) as automated testing was not feasible. - [ ] Additional tests are not required for this change (e.g. documentation update).
1 parent 032efa6 commit a61ce24

File tree

3 files changed

+132
-2
lines changed

3 files changed

+132
-2
lines changed

changes/20250530105438.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:sparkles: `logs` Add support for reading line by line in FIFO reader

utils/logs/fifo_logger.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,19 @@ package logs
22

33
import (
44
"bytes"
5+
"context"
56
"fmt"
67
"io"
8+
"iter"
79
"log"
810
"sync"
11+
"time"
12+
13+
"github.com/ARM-software/golang-utils/utils/parallelisation"
914
)
1015

16+
var _ Loggers = &FIFOLoggers{}
17+
1118
type FIFOWriter struct {
1219
io.WriteCloser
1320
mu sync.RWMutex
@@ -39,6 +46,68 @@ func (w *FIFOWriter) Read() string {
3946
return string(bytes)
4047
}
4148

49+
func (w *FIFOWriter) ReadLines(ctx context.Context) iter.Seq[string] {
50+
return func(yield func(string) bool) {
51+
var partial []byte
52+
for {
53+
if err := parallelisation.DetermineContextError(ctx); err != nil {
54+
return
55+
}
56+
57+
buf := func() []byte {
58+
w.mu.Lock()
59+
defer w.mu.Unlock()
60+
defer w.Logs.Reset()
61+
tmp := w.Logs.Bytes()
62+
buf := make([]byte, len(tmp))
63+
copy(buf, tmp)
64+
return buf
65+
}()
66+
67+
if len(buf) == 0 {
68+
if err := parallelisation.DetermineContextError(ctx); err != nil {
69+
if len(partial) > 0 {
70+
yield(string(partial))
71+
}
72+
return
73+
}
74+
75+
parallelisation.SleepWithContext(ctx, 50*time.Millisecond)
76+
continue
77+
}
78+
79+
if len(partial) > 0 {
80+
buf = append(partial, buf...)
81+
partial = nil
82+
}
83+
84+
for {
85+
idx := bytes.IndexByte(buf, '\n')
86+
if idx < 0 {
87+
break
88+
}
89+
line := buf[:idx]
90+
91+
if len(line) > 0 && line[len(line)-1] == '\r' {
92+
line = line[:len(line)-1]
93+
}
94+
buf = buf[idx+1:]
95+
if len(line) == 0 {
96+
continue
97+
}
98+
99+
if !yield(string(line)) {
100+
return
101+
}
102+
}
103+
104+
if len(buf) > 0 {
105+
partial = buf
106+
}
107+
}
108+
}
109+
}
110+
42111
type FIFOLoggers struct {
43112
GenericLoggers
44113
LogWriter FIFOWriter
@@ -52,6 +121,10 @@ func (l *FIFOLoggers) Read() string {
52121
return l.LogWriter.Read()
53122
}
54123

124+
func (l *FIFOLoggers) ReadLines(ctx context.Context) iter.Seq[string] {
125+
return l.LogWriter.ReadLines(ctx)
126+
}
127+
55128
// Close closes the logger
56129
func (l *FIFOLoggers) Close() (err error) {
57130
err = l.LogWriter.Close()

utils/logs/fifo_logger_test.go

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
package logs
22

33
import (
4+
"context"
5+
"regexp"
46
"strings"
57
"testing"
8+
"time"
69

10+
"github.com/stretchr/testify/assert"
711
"github.com/stretchr/testify/require"
812
)
913

10-
func TestFIFOLogger(t *testing.T) {
14+
func TestFIFOLoggerRead(t *testing.T) {
1115
loggers, err := NewFIFOLogger("Test")
1216
require.NoError(t, err)
1317
testLog(t, loggers)
@@ -27,7 +31,7 @@ func TestFIFOLogger(t *testing.T) {
2731
require.Empty(t, contents)
2832
}
2933

30-
func TestPlainFIFOLogger(t *testing.T) {
34+
func TestPlainFIFOLoggerRead(t *testing.T) {
3135
loggers, err := NewPlainFIFOLogger()
3236
require.NoError(t, err)
3337
testLog(t, loggers)
@@ -46,3 +50,55 @@ func TestPlainFIFOLogger(t *testing.T) {
4650
contents = loggers.Read()
4751
require.Empty(t, contents)
4852
}
53+
54+
func TestFIFOLoggerReadlines(t *testing.T) {
55+
loggers, err := NewFIFOLogger("Test")
56+
require.NoError(t, err)
57+
testLog(t, loggers)
58+
loggers.LogError("Test err\n")
59+
loggers.Log("Test1")
60+
count := 0
61+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
62+
defer cancel()
63+
64+
var b strings.Builder
65+
for line := range loggers.ReadLines(ctx) {
66+
_, err := b.WriteString(line + "\n")
67+
require.NoError(t, err)
68+
count++
69+
}
70+
71+
assert.Regexp(t, regexp.MustCompile(`\[Test\] Error: .* .* Test err\n\[Test\] Output: .* .* Test1\n`), b.String())
72+
assert.Equal(t, 2, count)
73+
}
74+
75+
func TestPlainFIFOLoggerReadlines(t *testing.T) {
76+
loggers, err := NewPlainFIFOLogger()
77+
require.NoError(t, err)
78+
testLog(t, loggers)
79+
80+
go func() {
81+
time.Sleep(500 * time.Millisecond)
82+
loggers.LogError("Test err")
83+
loggers.Log("")
84+
time.Sleep(100 * time.Millisecond)
85+
loggers.Log("Test1")
86+
loggers.Log("\n\n\n")
87+
time.Sleep(200 * time.Millisecond)
88+
loggers.Log("Test2")
89+
}()
90+
91+
count := 0
92+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
93+
defer cancel()
94+
95+
var b strings.Builder
96+
for line := range loggers.ReadLines(ctx) {
97+
_, err := b.WriteString(line + "\n")
98+
require.NoError(t, err)
99+
count++
100+
}
101+
102+
assert.Equal(t, "Test err\nTest1\nTest2\n", b.String())
103+
assert.Equal(t, 3, count)
104+
}

0 commit comments

Comments
 (0)