Skip to content

Commit f46ef0e

Browse files
authored
chore(engine): Refactor streams result builder to use column-oriented processsing (#19505)
Change streamsResultBuilder.CollectRecord() from row-by-row to column-by-column iteration. This better aligns with Arrow's columnar memory layout, improving CPU cache locality by reading contiguous memory regions. I also remove IsValid checks given that IsNull checks are present. IsValid seems to be an exact opposite of IsNull (based on this code). The slices and label builders for intermediate rows data are reused between CollectRecord.
1 parent 160dc2c commit f46ef0e

File tree

3 files changed

+562
-81
lines changed

3 files changed

+562
-81
lines changed

pkg/engine/compat.go

Lines changed: 152 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@ import (
99
"github.com/prometheus/prometheus/model/labels"
1010
"github.com/prometheus/prometheus/promql"
1111

12+
"github.com/grafana/loki/pkg/push"
13+
1214
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
1315
"github.com/grafana/loki/v3/pkg/engine/internal/types"
1416
"github.com/grafana/loki/v3/pkg/logproto"
1517
"github.com/grafana/loki/v3/pkg/logqlmodel"
1618
"github.com/grafana/loki/v3/pkg/logqlmodel/metadata"
1719
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
18-
19-
"github.com/grafana/loki/pkg/push"
2020
)
2121

2222
type ResultBuilder interface {
@@ -33,119 +33,192 @@ var (
3333

3434
func newStreamsResultBuilder() *streamsResultBuilder {
3535
return &streamsResultBuilder{
36-
data: make(logqlmodel.Streams, 0),
37-
streams: make(map[string]int),
36+
data: make(logqlmodel.Streams, 0),
37+
streams: make(map[string]int),
38+
rowBuilders: nil,
3839
}
3940
}
4041

4142
type streamsResultBuilder struct {
4243
streams map[string]int
4344
data logqlmodel.Streams
4445
count int
45-
}
46-
47-
func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) {
48-
for row := range int(rec.NumRows()) {
49-
stream, entry := b.collectRow(rec, row)
5046

51-
// Ignore rows that don't have stream labels, log line, or timestamp
52-
if stream.IsEmpty() || entry.Line == "" || entry.Timestamp.Equal(time.Time{}) {
53-
continue
54-
}
47+
// buffer for rows
48+
rowBuilders []rowBuilder
49+
}
5550

56-
// Add the entry to the result builder
57-
key := stream.String()
58-
idx, ok := b.streams[key]
59-
if !ok {
60-
idx = len(b.data)
61-
b.streams[key] = idx
62-
b.data = append(b.data, push.Stream{Labels: key})
63-
}
64-
b.data[idx].Entries = append(b.data[idx].Entries, entry)
65-
b.count++
66-
}
51+
type rowBuilder struct {
52+
timestamp time.Time
53+
line string
54+
lbsBuilder *labels.Builder
55+
metadataBuilder *labels.Builder
56+
parsedBuilder *labels.Builder
6757
}
6858

69-
func (b *streamsResultBuilder) collectRow(rec arrow.Record, i int) (labels.Labels, logproto.Entry) {
70-
var entry logproto.Entry
71-
lbs := labels.NewBuilder(labels.EmptyLabels())
72-
metadata := labels.NewBuilder(labels.EmptyLabels())
73-
parsed := labels.NewBuilder(labels.EmptyLabels())
59+
func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) {
60+
numRows := int(rec.NumRows())
61+
if numRows == 0 {
62+
return
63+
}
7464

65+
// let's say we have the following log entries in rec:
66+
// - {labelenv="prod-1", metadatatrace="123-1", parsed="v1"} ts1 line 1
67+
// - {labelenv="prod-2", metadatatrace="123-2", parsed="v2"} ts2 line 2
68+
// - {labelenv="prod-3", metadatatrace="123-3", parsed="v3"} ts3 line 3
69+
// we pre-initialize slices to store column values for all the rows, e.g.:
70+
// rows | 1 | 2 | 3 | ...
71+
// ==============+=========+=========+=========+====
72+
// timestamps | r1 ts | r2 ts | r3 ts | ...
73+
// lines | r1 line | r2 line | r3 line | ...
74+
// ...
75+
// We iterate over the columns and convert the values to our format column by column, e.g.,
76+
// first all the timestamps, then all the log lines, etc.
77+
// After all the values are collected and converted we transform the columnar representation to a row-based one.
78+
79+
b.ensureRowBuilders(numRows)
80+
81+
// Convert arrow values to our format column by column
7582
for colIdx := range int(rec.NumCols()) {
7683
col := rec.Column(colIdx)
77-
// Ignore column values that are NULL or invalid
78-
if col.IsNull(i) || !col.IsValid(i) {
79-
continue
80-
}
8184

8285
field := rec.Schema().Field(colIdx)
8386
ident, err := semconv.ParseFQN(field.Name)
8487
if err != nil {
8588
continue
8689
}
87-
8890
shortName := ident.ShortName()
8991

90-
// Extract line
91-
if ident.Equal(semconv.ColumnIdentMessage) {
92-
entry.Line = col.(*array.String).Value(i)
93-
continue
92+
switch true {
93+
94+
// Log line
95+
case ident.Equal(semconv.ColumnIdentMessage):
96+
lineCol := col.(*array.String)
97+
forEachNotNullRowColValue(numRows, lineCol, func(rowIdx int) {
98+
b.rowBuilders[rowIdx].line = lineCol.Value(rowIdx)
99+
})
100+
101+
// Timestamp
102+
case ident.Equal(semconv.ColumnIdentTimestamp):
103+
tsCol := col.(*array.Timestamp)
104+
forEachNotNullRowColValue(numRows, tsCol, func(rowIdx int) {
105+
b.rowBuilders[rowIdx].timestamp = time.Unix(0, int64(tsCol.Value(rowIdx)))
106+
})
107+
108+
// One of the label columns
109+
case ident.ColumnType() == types.ColumnTypeLabel:
110+
labelCol := col.(*array.String)
111+
forEachNotNullRowColValue(numRows, labelCol, func(rowIdx int) {
112+
b.rowBuilders[rowIdx].lbsBuilder.Set(shortName, labelCol.Value(rowIdx))
113+
})
114+
115+
// One of the metadata columns
116+
case ident.ColumnType() == types.ColumnTypeMetadata:
117+
metadataCol := col.(*array.String)
118+
forEachNotNullRowColValue(numRows, metadataCol, func(rowIdx int) {
119+
val := metadataCol.Value(rowIdx)
120+
b.rowBuilders[rowIdx].metadataBuilder.Set(shortName, val)
121+
b.rowBuilders[rowIdx].lbsBuilder.Set(shortName, val)
122+
})
123+
124+
// One of the parsed columns
125+
case ident.ColumnType() == types.ColumnTypeParsed:
126+
parsedCol := col.(*array.String)
127+
128+
// TODO: keep errors if --strict is set
129+
// These are reserved column names used to track parsing errors. We are dropping them until
130+
// we add support for --strict parsing.
131+
if shortName == types.ColumnNameError || shortName == types.ColumnNameErrorDetails {
132+
continue
133+
}
134+
135+
forEachNotNullRowColValue(numRows, parsedCol, func(rowIdx int) {
136+
parsedVal := parsedCol.Value(rowIdx)
137+
if b.rowBuilders[rowIdx].parsedBuilder.Get(shortName) != "" {
138+
return
139+
}
140+
b.rowBuilders[rowIdx].parsedBuilder.Set(shortName, parsedVal)
141+
b.rowBuilders[rowIdx].lbsBuilder.Set(shortName, parsedVal)
142+
if b.rowBuilders[rowIdx].metadataBuilder.Get(shortName) != "" {
143+
b.rowBuilders[rowIdx].metadataBuilder.Del(shortName)
144+
}
145+
})
94146
}
147+
}
95148

96-
// Extract timestamp
97-
if ident.Equal(semconv.ColumnIdentTimestamp) {
98-
entry.Timestamp = time.Unix(0, int64(col.(*array.Timestamp).Value(i)))
149+
// Convert columnar representation to a row-based one
150+
for rowIdx := range numRows {
151+
lbs := b.rowBuilders[rowIdx].lbsBuilder.Labels()
152+
ts := b.rowBuilders[rowIdx].timestamp
153+
line := b.rowBuilders[rowIdx].line
154+
// Ignore rows that don't have stream labels, log line, or timestamp
155+
if line == "" || ts.IsZero() || lbs.IsEmpty() {
156+
b.resetRowBuilder(rowIdx)
99157
continue
100158
}
101159

102-
// Extract label
103-
if ident.ColumnType() == types.ColumnTypeLabel {
104-
switch arr := col.(type) {
105-
case *array.String:
106-
lbs.Set(shortName, arr.Value(i))
107-
}
108-
continue
160+
entry := logproto.Entry{
161+
Timestamp: ts,
162+
Line: line,
163+
StructuredMetadata: logproto.FromLabelsToLabelAdapters(b.rowBuilders[rowIdx].metadataBuilder.Labels()),
164+
Parsed: logproto.FromLabelsToLabelAdapters(b.rowBuilders[rowIdx].parsedBuilder.Labels()),
109165
}
166+
b.resetRowBuilder(rowIdx)
110167

111-
// Extract metadata
112-
if ident.ColumnType() == types.ColumnTypeMetadata {
113-
switch arr := col.(type) {
114-
case *array.String:
115-
metadata.Set(shortName, arr.Value(i))
116-
// include structured metadata in stream labels
117-
lbs.Set(shortName, arr.Value(i))
118-
}
119-
continue
168+
// Add entry to appropriate stream
169+
key := lbs.String()
170+
idx, ok := b.streams[key]
171+
if !ok {
172+
idx = len(b.data)
173+
b.streams[key] = idx
174+
b.data = append(b.data, push.Stream{Labels: key})
120175
}
176+
b.data[idx].Entries = append(b.data[idx].Entries, entry)
177+
b.count++
178+
}
179+
}
121180

122-
// Extract parsed
123-
if ident.ColumnType() == types.ColumnTypeParsed {
124-
switch arr := col.(type) {
125-
case *array.String:
126-
// TODO: keep errors if --strict is set
127-
// These are reserved column names used to track parsing errors. We are dropping them until
128-
// we add support for --strict parsing.
129-
if shortName == types.ColumnNameError || shortName == types.ColumnNameErrorDetails {
130-
continue
131-
}
181+
func (b *streamsResultBuilder) ensureRowBuilders(newLen int) {
182+
if newLen == len(b.rowBuilders) {
183+
return
184+
}
132185

133-
if parsed.Get(shortName) != "" {
134-
continue
135-
}
186+
if newLen < len(b.rowBuilders) {
187+
// free not used items at the end of the slices so they can be GC-ed
188+
clear(b.rowBuilders[newLen:len(b.rowBuilders)])
189+
b.rowBuilders = b.rowBuilders[:newLen]
136190

137-
parsed.Set(shortName, arr.Value(i))
138-
lbs.Set(shortName, arr.Value(i))
139-
if metadata.Get(shortName) != "" {
140-
metadata.Del(shortName)
141-
}
142-
}
191+
return
192+
}
193+
194+
// newLen > buf.len
195+
numRowsToAdd := newLen - len(b.rowBuilders)
196+
oldLen := len(b.rowBuilders)
197+
b.rowBuilders = append(b.rowBuilders, make([]rowBuilder, numRowsToAdd)...)
198+
for i := oldLen; i < newLen; i++ {
199+
b.rowBuilders[i] = rowBuilder{
200+
lbsBuilder: labels.NewBuilder(labels.EmptyLabels()),
201+
metadataBuilder: labels.NewBuilder(labels.EmptyLabels()),
202+
parsedBuilder: labels.NewBuilder(labels.EmptyLabels()),
143203
}
144204
}
145-
entry.StructuredMetadata = logproto.FromLabelsToLabelAdapters(metadata.Labels())
146-
entry.Parsed = logproto.FromLabelsToLabelAdapters(parsed.Labels())
205+
}
206+
207+
func (b *streamsResultBuilder) resetRowBuilder(i int) {
208+
b.rowBuilders[i].timestamp = time.Time{}
209+
b.rowBuilders[i].line = ""
210+
b.rowBuilders[i].lbsBuilder.Reset(labels.EmptyLabels())
211+
b.rowBuilders[i].metadataBuilder.Reset(labels.EmptyLabels())
212+
b.rowBuilders[i].parsedBuilder.Reset(labels.EmptyLabels())
213+
}
147214

148-
return lbs.Labels(), entry
215+
func forEachNotNullRowColValue(numRows int, col arrow.Array, f func(rowIdx int)) {
216+
for rowIdx := 0; rowIdx < numRows; rowIdx++ {
217+
if col.IsNull(rowIdx) {
218+
continue
219+
}
220+
f(rowIdx)
221+
}
149222
}
150223

151224
func (b *streamsResultBuilder) Build(s stats.Result, md *metadata.Context) logqlmodel.Result {

0 commit comments

Comments
 (0)