Skip to content

Commit f4db0fc

Browse files
authored
Merge branch 'main' into feature/helm-add-lifecycle-for-index-gateway
2 parents 5920524 + f46ef0e commit f4db0fc

File tree

3 files changed

+562
-81
lines changed

3 files changed

+562
-81
lines changed

pkg/engine/compat.go

Lines changed: 152 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@ import (
99
"github.com/prometheus/prometheus/model/labels"
1010
"github.com/prometheus/prometheus/promql"
1111

12+
"github.com/grafana/loki/pkg/push"
13+
1214
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
1315
"github.com/grafana/loki/v3/pkg/engine/internal/types"
1416
"github.com/grafana/loki/v3/pkg/logproto"
1517
"github.com/grafana/loki/v3/pkg/logqlmodel"
1618
"github.com/grafana/loki/v3/pkg/logqlmodel/metadata"
1719
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
18-
19-
"github.com/grafana/loki/pkg/push"
2020
)
2121

2222
type ResultBuilder interface {
@@ -33,119 +33,192 @@ var (
3333

3434
func newStreamsResultBuilder() *streamsResultBuilder {
3535
return &streamsResultBuilder{
36-
data: make(logqlmodel.Streams, 0),
37-
streams: make(map[string]int),
36+
data: make(logqlmodel.Streams, 0),
37+
streams: make(map[string]int),
38+
rowBuilders: nil,
3839
}
3940
}
4041

4142
type streamsResultBuilder struct {
4243
streams map[string]int
4344
data logqlmodel.Streams
4445
count int
45-
}
46-
47-
func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) {
48-
for row := range int(rec.NumRows()) {
49-
stream, entry := b.collectRow(rec, row)
5046

51-
// Ignore rows that don't have stream labels, log line, or timestamp
52-
if stream.IsEmpty() || entry.Line == "" || entry.Timestamp.Equal(time.Time{}) {
53-
continue
54-
}
47+
// buffer for rows
48+
rowBuilders []rowBuilder
49+
}
5550

56-
// Add the entry to the result builder
57-
key := stream.String()
58-
idx, ok := b.streams[key]
59-
if !ok {
60-
idx = len(b.data)
61-
b.streams[key] = idx
62-
b.data = append(b.data, push.Stream{Labels: key})
63-
}
64-
b.data[idx].Entries = append(b.data[idx].Entries, entry)
65-
b.count++
66-
}
51+
type rowBuilder struct {
52+
timestamp time.Time
53+
line string
54+
lbsBuilder *labels.Builder
55+
metadataBuilder *labels.Builder
56+
parsedBuilder *labels.Builder
6757
}
6858

69-
func (b *streamsResultBuilder) collectRow(rec arrow.Record, i int) (labels.Labels, logproto.Entry) {
70-
var entry logproto.Entry
71-
lbs := labels.NewBuilder(labels.EmptyLabels())
72-
metadata := labels.NewBuilder(labels.EmptyLabels())
73-
parsed := labels.NewBuilder(labels.EmptyLabels())
59+
func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) {
60+
numRows := int(rec.NumRows())
61+
if numRows == 0 {
62+
return
63+
}
7464

65+
// let's say we have the following log entries in rec:
66+
// - {labelenv="prod-1", metadatatrace="123-1", parsed="v1"} ts1 line 1
67+
// - {labelenv="prod-2", metadatatrace="123-2", parsed="v2"} ts2 line 2
68+
// - {labelenv="prod-3", metadatatrace="123-3", parsed="v3"} ts3 line 3
69+
// we pre-initialize slices to store column values for all the rows, e.g.:
70+
// rows | 1 | 2 | 3 | ...
71+
// ==============+=========+=========+=========+====
72+
// timestamps | r1 ts | r2 ts | r3 ts | ...
73+
// lines | r1 line | r2 line | r3 line | ...
74+
// ...
75+
// We iterate over the columns and convert the values to our format column by column, e.g.,
76+
// first all the timestamps, then all the log lines, etc.
77+
// After all the values are collected and converted we transform the columnar representation to a row-based one.
78+
79+
b.ensureRowBuilders(numRows)
80+
81+
// Convert arrow values to our format column by column
7582
for colIdx := range int(rec.NumCols()) {
7683
col := rec.Column(colIdx)
77-
// Ignore column values that are NULL or invalid
78-
if col.IsNull(i) || !col.IsValid(i) {
79-
continue
80-
}
8184

8285
field := rec.Schema().Field(colIdx)
8386
ident, err := semconv.ParseFQN(field.Name)
8487
if err != nil {
8588
continue
8689
}
87-
8890
shortName := ident.ShortName()
8991

90-
// Extract line
91-
if ident.Equal(semconv.ColumnIdentMessage) {
92-
entry.Line = col.(*array.String).Value(i)
93-
continue
92+
switch true {
93+
94+
// Log line
95+
case ident.Equal(semconv.ColumnIdentMessage):
96+
lineCol := col.(*array.String)
97+
forEachNotNullRowColValue(numRows, lineCol, func(rowIdx int) {
98+
b.rowBuilders[rowIdx].line = lineCol.Value(rowIdx)
99+
})
100+
101+
// Timestamp
102+
case ident.Equal(semconv.ColumnIdentTimestamp):
103+
tsCol := col.(*array.Timestamp)
104+
forEachNotNullRowColValue(numRows, tsCol, func(rowIdx int) {
105+
b.rowBuilders[rowIdx].timestamp = time.Unix(0, int64(tsCol.Value(rowIdx)))
106+
})
107+
108+
// One of the label columns
109+
case ident.ColumnType() == types.ColumnTypeLabel:
110+
labelCol := col.(*array.String)
111+
forEachNotNullRowColValue(numRows, labelCol, func(rowIdx int) {
112+
b.rowBuilders[rowIdx].lbsBuilder.Set(shortName, labelCol.Value(rowIdx))
113+
})
114+
115+
// One of the metadata columns
116+
case ident.ColumnType() == types.ColumnTypeMetadata:
117+
metadataCol := col.(*array.String)
118+
forEachNotNullRowColValue(numRows, metadataCol, func(rowIdx int) {
119+
val := metadataCol.Value(rowIdx)
120+
b.rowBuilders[rowIdx].metadataBuilder.Set(shortName, val)
121+
b.rowBuilders[rowIdx].lbsBuilder.Set(shortName, val)
122+
})
123+
124+
// One of the parsed columns
125+
case ident.ColumnType() == types.ColumnTypeParsed:
126+
parsedCol := col.(*array.String)
127+
128+
// TODO: keep errors if --strict is set
129+
// These are reserved column names used to track parsing errors. We are dropping them until
130+
// we add support for --strict parsing.
131+
if shortName == types.ColumnNameError || shortName == types.ColumnNameErrorDetails {
132+
continue
133+
}
134+
135+
forEachNotNullRowColValue(numRows, parsedCol, func(rowIdx int) {
136+
parsedVal := parsedCol.Value(rowIdx)
137+
if b.rowBuilders[rowIdx].parsedBuilder.Get(shortName) != "" {
138+
return
139+
}
140+
b.rowBuilders[rowIdx].parsedBuilder.Set(shortName, parsedVal)
141+
b.rowBuilders[rowIdx].lbsBuilder.Set(shortName, parsedVal)
142+
if b.rowBuilders[rowIdx].metadataBuilder.Get(shortName) != "" {
143+
b.rowBuilders[rowIdx].metadataBuilder.Del(shortName)
144+
}
145+
})
94146
}
147+
}
95148

96-
// Extract timestamp
97-
if ident.Equal(semconv.ColumnIdentTimestamp) {
98-
entry.Timestamp = time.Unix(0, int64(col.(*array.Timestamp).Value(i)))
149+
// Convert columnar representation to a row-based one
150+
for rowIdx := range numRows {
151+
lbs := b.rowBuilders[rowIdx].lbsBuilder.Labels()
152+
ts := b.rowBuilders[rowIdx].timestamp
153+
line := b.rowBuilders[rowIdx].line
154+
// Ignore rows that don't have stream labels, log line, or timestamp
155+
if line == "" || ts.IsZero() || lbs.IsEmpty() {
156+
b.resetRowBuilder(rowIdx)
99157
continue
100158
}
101159

102-
// Extract label
103-
if ident.ColumnType() == types.ColumnTypeLabel {
104-
switch arr := col.(type) {
105-
case *array.String:
106-
lbs.Set(shortName, arr.Value(i))
107-
}
108-
continue
160+
entry := logproto.Entry{
161+
Timestamp: ts,
162+
Line: line,
163+
StructuredMetadata: logproto.FromLabelsToLabelAdapters(b.rowBuilders[rowIdx].metadataBuilder.Labels()),
164+
Parsed: logproto.FromLabelsToLabelAdapters(b.rowBuilders[rowIdx].parsedBuilder.Labels()),
109165
}
166+
b.resetRowBuilder(rowIdx)
110167

111-
// Extract metadata
112-
if ident.ColumnType() == types.ColumnTypeMetadata {
113-
switch arr := col.(type) {
114-
case *array.String:
115-
metadata.Set(shortName, arr.Value(i))
116-
// include structured metadata in stream labels
117-
lbs.Set(shortName, arr.Value(i))
118-
}
119-
continue
168+
// Add entry to appropriate stream
169+
key := lbs.String()
170+
idx, ok := b.streams[key]
171+
if !ok {
172+
idx = len(b.data)
173+
b.streams[key] = idx
174+
b.data = append(b.data, push.Stream{Labels: key})
120175
}
176+
b.data[idx].Entries = append(b.data[idx].Entries, entry)
177+
b.count++
178+
}
179+
}
121180

122-
// Extract parsed
123-
if ident.ColumnType() == types.ColumnTypeParsed {
124-
switch arr := col.(type) {
125-
case *array.String:
126-
// TODO: keep errors if --strict is set
127-
// These are reserved column names used to track parsing errors. We are dropping them until
128-
// we add support for --strict parsing.
129-
if shortName == types.ColumnNameError || shortName == types.ColumnNameErrorDetails {
130-
continue
131-
}
181+
func (b *streamsResultBuilder) ensureRowBuilders(newLen int) {
182+
if newLen == len(b.rowBuilders) {
183+
return
184+
}
132185

133-
if parsed.Get(shortName) != "" {
134-
continue
135-
}
186+
if newLen < len(b.rowBuilders) {
187+
// free not used items at the end of the slices so they can be GC-ed
188+
clear(b.rowBuilders[newLen:len(b.rowBuilders)])
189+
b.rowBuilders = b.rowBuilders[:newLen]
136190

137-
parsed.Set(shortName, arr.Value(i))
138-
lbs.Set(shortName, arr.Value(i))
139-
if metadata.Get(shortName) != "" {
140-
metadata.Del(shortName)
141-
}
142-
}
191+
return
192+
}
193+
194+
// newLen > buf.len
195+
numRowsToAdd := newLen - len(b.rowBuilders)
196+
oldLen := len(b.rowBuilders)
197+
b.rowBuilders = append(b.rowBuilders, make([]rowBuilder, numRowsToAdd)...)
198+
for i := oldLen; i < newLen; i++ {
199+
b.rowBuilders[i] = rowBuilder{
200+
lbsBuilder: labels.NewBuilder(labels.EmptyLabels()),
201+
metadataBuilder: labels.NewBuilder(labels.EmptyLabels()),
202+
parsedBuilder: labels.NewBuilder(labels.EmptyLabels()),
143203
}
144204
}
145-
entry.StructuredMetadata = logproto.FromLabelsToLabelAdapters(metadata.Labels())
146-
entry.Parsed = logproto.FromLabelsToLabelAdapters(parsed.Labels())
205+
}
206+
207+
func (b *streamsResultBuilder) resetRowBuilder(i int) {
208+
b.rowBuilders[i].timestamp = time.Time{}
209+
b.rowBuilders[i].line = ""
210+
b.rowBuilders[i].lbsBuilder.Reset(labels.EmptyLabels())
211+
b.rowBuilders[i].metadataBuilder.Reset(labels.EmptyLabels())
212+
b.rowBuilders[i].parsedBuilder.Reset(labels.EmptyLabels())
213+
}
147214

148-
return lbs.Labels(), entry
215+
func forEachNotNullRowColValue(numRows int, col arrow.Array, f func(rowIdx int)) {
216+
for rowIdx := 0; rowIdx < numRows; rowIdx++ {
217+
if col.IsNull(rowIdx) {
218+
continue
219+
}
220+
f(rowIdx)
221+
}
149222
}
150223

151224
func (b *streamsResultBuilder) Build(s stats.Result, md *metadata.Context) logqlmodel.Result {

0 commit comments

Comments
 (0)