From 592052495c2e467295ecf2e9793ecaf6a24ee755 Mon Sep 17 00:00:00 2001 From: Novian Deny Date: Thu, 23 Oct 2025 10:06:45 +0700 Subject: [PATCH 1/2] feat(helm): add ability to specify index-gateway container lifecycle --- docs/sources/setup/install/helm/reference.md | 10 ++++++++++ production/helm/loki/CHANGELOG.md | 2 ++ .../index-gateway/statefulset-index-gateway.yaml | 4 ++++ production/helm/loki/values.yaml | 2 ++ 4 files changed, 18 insertions(+) diff --git a/docs/sources/setup/install/helm/reference.md b/docs/sources/setup/install/helm/reference.md index 209b6d6e754de..da77181cec2cd 100644 --- a/docs/sources/setup/install/helm/reference.md +++ b/docs/sources/setup/install/helm/reference.md @@ -5467,6 +5467,7 @@ null }, "initContainers": [], "joinMemberlist": true, + "lifecycle": {}, "maxUnavailable": null, "nodeSelector": {}, "persistence": { @@ -5645,6 +5646,15 @@ null
 true
 
+ + + + indexGateway.lifecycle + object + Lifecycle for the index-gateway container +
+{}
+
diff --git a/production/helm/loki/CHANGELOG.md b/production/helm/loki/CHANGELOG.md index 730af1ac6f66d..ce8cda434460e 100644 --- a/production/helm/loki/CHANGELOG.md +++ b/production/helm/loki/CHANGELOG.md @@ -13,6 +13,8 @@ Entries should include a reference to the pull request that introduced the chang ## Unreleased +- [ENHANCEMENT] Add the ability to specify index-gateway container lifecycle. [#19573](https://github.com/grafana/loki/pull/19573) + ## 6.43.0 - [BREAKING] **Loki UI has been completely removed from the Helm chart.** The experimental Loki UI has been moved to a [Grafana Plugin] (https://github.com/grafana/loki-operational-ui). Enabling the UI in the Helm chart will now only enable the APIs needed by the plugin, and will host them on the querier. The gateway will now forward all UI requests to the queriers. Users who previously had `loki.ui.enabled: true` should remove this configuration and migrate to the Grafana Loki plugin for UI functionality. [#19390](https://github.com/grafana/loki/pull/19390) diff --git a/production/helm/loki/templates/index-gateway/statefulset-index-gateway.yaml b/production/helm/loki/templates/index-gateway/statefulset-index-gateway.yaml index b50bf66bba06c..52d4c225dd3d9 100644 --- a/production/helm/loki/templates/index-gateway/statefulset-index-gateway.yaml +++ b/production/helm/loki/templates/index-gateway/statefulset-index-gateway.yaml @@ -130,6 +130,10 @@ spec: {{- end }} resources: {{- toYaml .Values.indexGateway.resources | nindent 12 }} + {{- with .Values.indexGateway.lifecycle }} + lifecycle: + {{- toYaml . | nindent 12 }} + {{- end }} {{- if .Values.indexGateway.extraContainers }} {{- toYaml .Values.indexGateway.extraContainers | nindent 8}} {{- end }} diff --git a/production/helm/loki/values.yaml b/production/helm/loki/values.yaml index bc72cd974457f..c0f33a959b07b 100644 --- a/production/helm/loki/values.yaml +++ b/production/helm/loki/values.yaml @@ -2593,6 +2593,8 @@ indexGateway: initContainers: [] # -- Grace period to allow the index-gateway to shutdown before it is killed. terminationGracePeriodSeconds: 300 + # -- Lifecycle for the index-gateway container + lifecycle: {} # -- Affinity for index-gateway pods. # @default -- Hard node anti-affinity # The value will be passed through tpl. From 4a8b7802ac5af4c4dd6ef60b9ce62ab0cd01d33b Mon Sep 17 00:00:00 2001 From: Ivan Kalita Date: Thu, 23 Oct 2025 11:00:46 +0200 Subject: [PATCH 2/2] chore(engine): Refactor streams result builder to use column-oriented processsing (#19505) Change streamsResultBuilder.CollectRecord() from row-by-row to column-by-column iteration. This better aligns with Arrow's columnar memory layout, improving CPU cache locality by reading contiguous memory regions. I also remove IsValid checks given that IsNull checks are present. IsValid seems to be an exact opposite of IsNull (based on this code). The slices and label builders for intermediate rows data are reused between CollectRecord. --- pkg/engine/compat.go | 231 +++++++++++++++++++---------- pkg/engine/compat_bench_test.go | 165 +++++++++++++++++++++ pkg/engine/compat_test.go | 247 +++++++++++++++++++++++++++++++- 3 files changed, 562 insertions(+), 81 deletions(-) create mode 100644 pkg/engine/compat_bench_test.go diff --git a/pkg/engine/compat.go b/pkg/engine/compat.go index 25e637227cadc..7a92eb44d42ff 100644 --- a/pkg/engine/compat.go +++ b/pkg/engine/compat.go @@ -9,14 +9,14 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" + "github.com/grafana/loki/pkg/push" + "github.com/grafana/loki/v3/pkg/engine/internal/semconv" "github.com/grafana/loki/v3/pkg/engine/internal/types" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/v3/pkg/logqlmodel/metadata" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" - - "github.com/grafana/loki/pkg/push" ) type ResultBuilder interface { @@ -33,8 +33,9 @@ var ( func newStreamsResultBuilder() *streamsResultBuilder { return &streamsResultBuilder{ - data: make(logqlmodel.Streams, 0), - streams: make(map[string]int), + data: make(logqlmodel.Streams, 0), + streams: make(map[string]int), + rowBuilders: nil, } } @@ -42,110 +43,182 @@ type streamsResultBuilder struct { streams map[string]int data logqlmodel.Streams count int -} - -func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { - for row := range int(rec.NumRows()) { - stream, entry := b.collectRow(rec, row) - // Ignore rows that don't have stream labels, log line, or timestamp - if stream.IsEmpty() || entry.Line == "" || entry.Timestamp.Equal(time.Time{}) { - continue - } + // buffer for rows + rowBuilders []rowBuilder +} - // Add the entry to the result builder - key := stream.String() - idx, ok := b.streams[key] - if !ok { - idx = len(b.data) - b.streams[key] = idx - b.data = append(b.data, push.Stream{Labels: key}) - } - b.data[idx].Entries = append(b.data[idx].Entries, entry) - b.count++ - } +type rowBuilder struct { + timestamp time.Time + line string + lbsBuilder *labels.Builder + metadataBuilder *labels.Builder + parsedBuilder *labels.Builder } -func (b *streamsResultBuilder) collectRow(rec arrow.Record, i int) (labels.Labels, logproto.Entry) { - var entry logproto.Entry - lbs := labels.NewBuilder(labels.EmptyLabels()) - metadata := labels.NewBuilder(labels.EmptyLabels()) - parsed := labels.NewBuilder(labels.EmptyLabels()) +func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { + numRows := int(rec.NumRows()) + if numRows == 0 { + return + } + // let's say we have the following log entries in rec: + // - {labelenv="prod-1", metadatatrace="123-1", parsed="v1"} ts1 line 1 + // - {labelenv="prod-2", metadatatrace="123-2", parsed="v2"} ts2 line 2 + // - {labelenv="prod-3", metadatatrace="123-3", parsed="v3"} ts3 line 3 + // we pre-initialize slices to store column values for all the rows, e.g.: + // rows | 1 | 2 | 3 | ... + // ==============+=========+=========+=========+==== + // timestamps | r1 ts | r2 ts | r3 ts | ... + // lines | r1 line | r2 line | r3 line | ... + // ... + // We iterate over the columns and convert the values to our format column by column, e.g., + // first all the timestamps, then all the log lines, etc. + // After all the values are collected and converted we transform the columnar representation to a row-based one. + + b.ensureRowBuilders(numRows) + + // Convert arrow values to our format column by column for colIdx := range int(rec.NumCols()) { col := rec.Column(colIdx) - // Ignore column values that are NULL or invalid - if col.IsNull(i) || !col.IsValid(i) { - continue - } field := rec.Schema().Field(colIdx) ident, err := semconv.ParseFQN(field.Name) if err != nil { continue } - shortName := ident.ShortName() - // Extract line - if ident.Equal(semconv.ColumnIdentMessage) { - entry.Line = col.(*array.String).Value(i) - continue + switch true { + + // Log line + case ident.Equal(semconv.ColumnIdentMessage): + lineCol := col.(*array.String) + forEachNotNullRowColValue(numRows, lineCol, func(rowIdx int) { + b.rowBuilders[rowIdx].line = lineCol.Value(rowIdx) + }) + + // Timestamp + case ident.Equal(semconv.ColumnIdentTimestamp): + tsCol := col.(*array.Timestamp) + forEachNotNullRowColValue(numRows, tsCol, func(rowIdx int) { + b.rowBuilders[rowIdx].timestamp = time.Unix(0, int64(tsCol.Value(rowIdx))) + }) + + // One of the label columns + case ident.ColumnType() == types.ColumnTypeLabel: + labelCol := col.(*array.String) + forEachNotNullRowColValue(numRows, labelCol, func(rowIdx int) { + b.rowBuilders[rowIdx].lbsBuilder.Set(shortName, labelCol.Value(rowIdx)) + }) + + // One of the metadata columns + case ident.ColumnType() == types.ColumnTypeMetadata: + metadataCol := col.(*array.String) + forEachNotNullRowColValue(numRows, metadataCol, func(rowIdx int) { + val := metadataCol.Value(rowIdx) + b.rowBuilders[rowIdx].metadataBuilder.Set(shortName, val) + b.rowBuilders[rowIdx].lbsBuilder.Set(shortName, val) + }) + + // One of the parsed columns + case ident.ColumnType() == types.ColumnTypeParsed: + parsedCol := col.(*array.String) + + // TODO: keep errors if --strict is set + // These are reserved column names used to track parsing errors. We are dropping them until + // we add support for --strict parsing. + if shortName == types.ColumnNameError || shortName == types.ColumnNameErrorDetails { + continue + } + + forEachNotNullRowColValue(numRows, parsedCol, func(rowIdx int) { + parsedVal := parsedCol.Value(rowIdx) + if b.rowBuilders[rowIdx].parsedBuilder.Get(shortName) != "" { + return + } + b.rowBuilders[rowIdx].parsedBuilder.Set(shortName, parsedVal) + b.rowBuilders[rowIdx].lbsBuilder.Set(shortName, parsedVal) + if b.rowBuilders[rowIdx].metadataBuilder.Get(shortName) != "" { + b.rowBuilders[rowIdx].metadataBuilder.Del(shortName) + } + }) } + } - // Extract timestamp - if ident.Equal(semconv.ColumnIdentTimestamp) { - entry.Timestamp = time.Unix(0, int64(col.(*array.Timestamp).Value(i))) + // Convert columnar representation to a row-based one + for rowIdx := range numRows { + lbs := b.rowBuilders[rowIdx].lbsBuilder.Labels() + ts := b.rowBuilders[rowIdx].timestamp + line := b.rowBuilders[rowIdx].line + // Ignore rows that don't have stream labels, log line, or timestamp + if line == "" || ts.IsZero() || lbs.IsEmpty() { + b.resetRowBuilder(rowIdx) continue } - // Extract label - if ident.ColumnType() == types.ColumnTypeLabel { - switch arr := col.(type) { - case *array.String: - lbs.Set(shortName, arr.Value(i)) - } - continue + entry := logproto.Entry{ + Timestamp: ts, + Line: line, + StructuredMetadata: logproto.FromLabelsToLabelAdapters(b.rowBuilders[rowIdx].metadataBuilder.Labels()), + Parsed: logproto.FromLabelsToLabelAdapters(b.rowBuilders[rowIdx].parsedBuilder.Labels()), } + b.resetRowBuilder(rowIdx) - // Extract metadata - if ident.ColumnType() == types.ColumnTypeMetadata { - switch arr := col.(type) { - case *array.String: - metadata.Set(shortName, arr.Value(i)) - // include structured metadata in stream labels - lbs.Set(shortName, arr.Value(i)) - } - continue + // Add entry to appropriate stream + key := lbs.String() + idx, ok := b.streams[key] + if !ok { + idx = len(b.data) + b.streams[key] = idx + b.data = append(b.data, push.Stream{Labels: key}) } + b.data[idx].Entries = append(b.data[idx].Entries, entry) + b.count++ + } +} - // Extract parsed - if ident.ColumnType() == types.ColumnTypeParsed { - switch arr := col.(type) { - case *array.String: - // TODO: keep errors if --strict is set - // These are reserved column names used to track parsing errors. We are dropping them until - // we add support for --strict parsing. - if shortName == types.ColumnNameError || shortName == types.ColumnNameErrorDetails { - continue - } +func (b *streamsResultBuilder) ensureRowBuilders(newLen int) { + if newLen == len(b.rowBuilders) { + return + } - if parsed.Get(shortName) != "" { - continue - } + if newLen < len(b.rowBuilders) { + // free not used items at the end of the slices so they can be GC-ed + clear(b.rowBuilders[newLen:len(b.rowBuilders)]) + b.rowBuilders = b.rowBuilders[:newLen] - parsed.Set(shortName, arr.Value(i)) - lbs.Set(shortName, arr.Value(i)) - if metadata.Get(shortName) != "" { - metadata.Del(shortName) - } - } + return + } + + // newLen > buf.len + numRowsToAdd := newLen - len(b.rowBuilders) + oldLen := len(b.rowBuilders) + b.rowBuilders = append(b.rowBuilders, make([]rowBuilder, numRowsToAdd)...) + for i := oldLen; i < newLen; i++ { + b.rowBuilders[i] = rowBuilder{ + lbsBuilder: labels.NewBuilder(labels.EmptyLabels()), + metadataBuilder: labels.NewBuilder(labels.EmptyLabels()), + parsedBuilder: labels.NewBuilder(labels.EmptyLabels()), } } - entry.StructuredMetadata = logproto.FromLabelsToLabelAdapters(metadata.Labels()) - entry.Parsed = logproto.FromLabelsToLabelAdapters(parsed.Labels()) +} + +func (b *streamsResultBuilder) resetRowBuilder(i int) { + b.rowBuilders[i].timestamp = time.Time{} + b.rowBuilders[i].line = "" + b.rowBuilders[i].lbsBuilder.Reset(labels.EmptyLabels()) + b.rowBuilders[i].metadataBuilder.Reset(labels.EmptyLabels()) + b.rowBuilders[i].parsedBuilder.Reset(labels.EmptyLabels()) +} - return lbs.Labels(), entry +func forEachNotNullRowColValue(numRows int, col arrow.Array, f func(rowIdx int)) { + for rowIdx := 0; rowIdx < numRows; rowIdx++ { + if col.IsNull(rowIdx) { + continue + } + f(rowIdx) + } } func (b *streamsResultBuilder) Build(s stats.Result, md *metadata.Context) logqlmodel.Result { diff --git a/pkg/engine/compat_bench_test.go b/pkg/engine/compat_bench_test.go new file mode 100644 index 0000000000000..f8b8fcd34ca7e --- /dev/null +++ b/pkg/engine/compat_bench_test.go @@ -0,0 +1,165 @@ +package engine + +import ( + "fmt" + "testing" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/memory" + + "github.com/grafana/loki/v3/pkg/engine/internal/semconv" + "github.com/grafana/loki/v3/pkg/engine/internal/types" + "github.com/grafana/loki/v3/pkg/util/arrowtest" +) + +func BenchmarkStreamsResultBuilder(b *testing.B) { + alloc := memory.NewGoAllocator() + + benchmarks := []struct { + name string + numRowsFirstRecord int + numRowsSecondRecord int + numLabels int + numMeta int + numParsed int + }{ + { + name: "records_equal_size", + numRowsFirstRecord: 1000, + numRowsSecondRecord: 1000, + numLabels: 10, + numMeta: 5, + numParsed: 8, + }, + { + name: "record_two_bigger", + numRowsFirstRecord: 1000, + numRowsSecondRecord: 2000, + numLabels: 10, + numMeta: 5, + numParsed: 8, + }, + { + name: "record_two_smaller", + numRowsFirstRecord: 1000, + numRowsSecondRecord: 500, + numLabels: 10, + numMeta: 5, + numParsed: 8, + }, + } + + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + + schema, labelIdents, metaIdents, parsedIdents := prepareSchema(bm.numLabels, bm.numMeta, bm.numParsed) + baseTime := time.Unix(0, 1620000000000000000).UTC() + + rows1 := generateRows(labelIdents, metaIdents, parsedIdents, bm.numRowsFirstRecord, baseTime) + record1 := rows1.Record(alloc, schema) + defer record1.Release() + + rows2 := generateRows(labelIdents, metaIdents, parsedIdents, bm.numRowsSecondRecord, baseTime) + record2 := rows2.Record(alloc, schema) + defer record2.Release() + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + rb := newStreamsResultBuilder() + // Collect records twice on purpose to see how efficient CollectRecord is when the builder already has + // some data + rb.CollectRecord(record1) + rb.CollectRecord(record2) + + // Ensure the result is used to prevent compiler optimizations + if rb.Len() != bm.numRowsFirstRecord+bm.numRowsSecondRecord { + b.Fatalf("expected %d entries, got %d", bm.numRowsFirstRecord+bm.numRowsSecondRecord, rb.Len()) + } + } + }) + } +} + +func prepareSchema(numLabels int, numMeta int, numParsed int) (*arrow.Schema, []*semconv.Identifier, []*semconv.Identifier, []*semconv.Identifier) { + // Build schema + colTs := semconv.ColumnIdentTimestamp + colMsg := semconv.ColumnIdentMessage + fields := []arrow.Field{ + semconv.FieldFromIdent(colTs, false), + semconv.FieldFromIdent(colMsg, false), + } + + // Add label columns + labelIdents := make([]*semconv.Identifier, numLabels) + for i := 0; i < numLabels; i++ { + ident := semconv.NewIdentifier( + fmt.Sprintf("label_%d", i), + types.ColumnTypeLabel, + types.Loki.String, + ) + labelIdents[i] = ident + fields = append(fields, semconv.FieldFromIdent(ident, false)) + } + + // Add metadata columns + metaIdents := make([]*semconv.Identifier, numMeta) + for i := 0; i < numMeta; i++ { + ident := semconv.NewIdentifier( + fmt.Sprintf("meta_%d", i), + types.ColumnTypeMetadata, + types.Loki.String, + ) + metaIdents[i] = ident + fields = append(fields, semconv.FieldFromIdent(ident, false)) + } + + // Add parsed columns + parsedIdents := make([]*semconv.Identifier, numParsed) + for i := 0; i < numParsed; i++ { + ident := semconv.NewIdentifier( + fmt.Sprintf("parsed_%d", i), + types.ColumnTypeParsed, + types.Loki.String, + ) + parsedIdents[i] = ident + fields = append(fields, semconv.FieldFromIdent(ident, false)) + } + + return arrow.NewSchema(fields, nil), labelIdents, metaIdents, parsedIdents +} + +func generateRows( + labelIdents []*semconv.Identifier, + metaIdents []*semconv.Identifier, + parsedIdents []*semconv.Identifier, + numRows int, + baseTime time.Time, +) arrowtest.Rows { + rows := make(arrowtest.Rows, numRows) + for rowIdx := 0; rowIdx < numRows; rowIdx++ { + row := make(map[string]any) + row[semconv.ColumnIdentTimestamp.FQN()] = baseTime.Add(time.Duration(rowIdx) * time.Nanosecond) + row[semconv.ColumnIdentMessage.FQN()] = fmt.Sprintf("log line %d with some additional text to make it more realistic", rowIdx) + + // Add label values + for labelIdx, ident := range labelIdents { + row[ident.FQN()] = fmt.Sprintf("label_%d_value_%d", labelIdx, rowIdx%10) + } + + // Add metadata values + for metaIdx, ident := range metaIdents { + row[ident.FQN()] = fmt.Sprintf("meta_%d_value_%d", metaIdx, rowIdx%5) + } + + // Add parsed values + for parsedIdx, ident := range parsedIdents { + row[ident.FQN()] = fmt.Sprintf("parsed_%d_value_%d", parsedIdx, rowIdx%3) + } + + rows[rowIdx] = row + } + return rows +} diff --git a/pkg/engine/compat_test.go b/pkg/engine/compat_test.go index 31eb77b1d5766..ca526cc7ee610 100644 --- a/pkg/engine/compat_test.go +++ b/pkg/engine/compat_test.go @@ -91,6 +91,13 @@ func TestStreamsResultBuilder(t *testing.T) { nil, ) rows := arrowtest.Rows{ + { + colTs.FQN(): nil, + colMsg.FQN(): "log line 0 (must be skipped)", + colEnv.FQN(): "dev", + colNs.FQN(): "loki-dev-001", + colTid.FQN(): "860e403fcf754312", + }, { colTs.FQN(): time.Unix(0, 1620000000000000001).UTC(), colMsg.FQN(): "log line 1", @@ -126,6 +133,13 @@ func TestStreamsResultBuilder(t *testing.T) { colNs.FQN(): "loki-dev-002", colTid.FQN(): "0cf883f112ad239b", }, + { + colTs.FQN(): time.Unix(0, 1620000000000000006).UTC(), + colMsg.FQN(): "log line 6", + colEnv.FQN(): "dev", + colNs.FQN(): nil, + colTid.FQN(): "9de325g124ad230b", + }, } record := rows.Record(memory.DefaultAllocator, schema) @@ -137,11 +151,11 @@ func TestStreamsResultBuilder(t *testing.T) { err := collectResult(context.Background(), pipeline, builder) require.NoError(t, err) - require.Equal(t, 5, builder.Len()) + require.Equal(t, 6, builder.Len()) md, _ := metadata.NewContext(t.Context()) result := builder.Build(stats.Result{}, md) - require.Equal(t, 5, result.Data.(logqlmodel.Streams).Len()) + require.Equal(t, 6, result.Data.(logqlmodel.Streams).Len()) expected := logqlmodel.Streams{ push.Stream{ @@ -162,6 +176,12 @@ func TestStreamsResultBuilder(t *testing.T) { {Line: "log line 3", Timestamp: time.Unix(0, 1620000000000000003), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "61330481e1e59b18")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})}, }, }, + push.Stream{ + Labels: labels.FromStrings("env", "dev", "traceID", "9de325g124ad230b").String(), + Entries: []logproto.Entry{ + {Line: "log line 6", Timestamp: time.Unix(0, 1620000000000000006), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "9de325g124ad230b")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})}, + }, + }, push.Stream{ Labels: labels.FromStrings("env", "prod", "namespace", "loki-prod-001", "traceID", "40e50221e284b9d2").String(), Entries: []logproto.Entry{ @@ -177,6 +197,229 @@ func TestStreamsResultBuilder(t *testing.T) { } require.Equal(t, expected, result.Data.(logqlmodel.Streams)) }) + + t.Run("multiple records with different streams are accumulated correctly", func(t *testing.T) { + colTs := semconv.ColumnIdentTimestamp + colMsg := semconv.ColumnIdentMessage + colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String) + + schema := arrow.NewSchema( + []arrow.Field{ + semconv.FieldFromIdent(colTs, false), + semconv.FieldFromIdent(colMsg, false), + semconv.FieldFromIdent(colEnv, false), + }, + nil, + ) + + // First record: prod and dev streams + rows1 := arrowtest.Rows{ + { + colTs.FQN(): time.Unix(0, 1620000000000000001).UTC(), + colMsg.FQN(): "log line 1", + colEnv.FQN(): "prod", + }, + { + colTs.FQN(): time.Unix(0, 1620000000000000002).UTC(), + colMsg.FQN(): "log line 2", + colEnv.FQN(): "dev", + }, + } + record1 := rows1.Record(memory.DefaultAllocator, schema) + defer record1.Release() + + // Second record: prod and staging streams + rows2 := arrowtest.Rows{ + { + colTs.FQN(): time.Unix(0, 1620000000000000003).UTC(), + colMsg.FQN(): "log line 3", + colEnv.FQN(): "prod", + }, + { + colTs.FQN(): time.Unix(0, 1620000000000000004).UTC(), + colMsg.FQN(): "log line 4", + colEnv.FQN(): "staging", + }, + } + record2 := rows2.Record(memory.DefaultAllocator, schema) + defer record2.Release() + + builder := newStreamsResultBuilder() + + // Collect first record + builder.CollectRecord(record1) + require.Equal(t, 2, builder.Len(), "should have 2 entries after first record") + + // Collect second record + builder.CollectRecord(record2) + require.Equal(t, 4, builder.Len(), "should have 4 entries total after second record") + + md, _ := metadata.NewContext(t.Context()) + result := builder.Build(stats.Result{}, md) + streams := result.Data.(logqlmodel.Streams) + // Note: 3 unique streams (dev, prod, staging), but 4 total entries + // The prod stream has 2 entries (one from each record) + require.Equal(t, 3, len(streams), "should have 3 unique streams") + + // Verify stream grouping - prod stream should have entries from both records + expected := logqlmodel.Streams{ + push.Stream{ + Labels: labels.FromStrings("env", "dev").String(), + Entries: []logproto.Entry{ + {Line: "log line 2", Timestamp: time.Unix(0, 1620000000000000002), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})}, + }, + }, + push.Stream{ + Labels: labels.FromStrings("env", "prod").String(), + Entries: []logproto.Entry{ + {Line: "log line 1", Timestamp: time.Unix(0, 1620000000000000001), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})}, + {Line: "log line 3", Timestamp: time.Unix(0, 1620000000000000003), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})}, + }, + }, + push.Stream{ + Labels: labels.FromStrings("env", "staging").String(), + Entries: []logproto.Entry{ + {Line: "log line 4", Timestamp: time.Unix(0, 1620000000000000004), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})}, + }, + }, + } + require.Equal(t, expected, streams) + }) + + t.Run("buffer reuse with varying record sizes", func(t *testing.T) { + colTs := semconv.ColumnIdentTimestamp + colMsg := semconv.ColumnIdentMessage + colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String) + + schema := arrow.NewSchema( + []arrow.Field{ + semconv.FieldFromIdent(colTs, false), + semconv.FieldFromIdent(colMsg, false), + semconv.FieldFromIdent(colEnv, false), + }, + nil, + ) + + builder := newStreamsResultBuilder() + + // First record: 5 rows (buffer grows to 5) + rows1 := make(arrowtest.Rows, 5) + for i := 0; i < 5; i++ { + rows1[i] = arrowtest.Row{ + colTs.FQN(): time.Unix(0, int64(1620000000000000001+i)).UTC(), + colMsg.FQN(): "log line", + colEnv.FQN(): "prod", + } + } + record1 := rows1.Record(memory.DefaultAllocator, schema) + builder.CollectRecord(record1) + record1.Release() + require.Equal(t, 5, builder.Len()) + require.Equal(t, 5, len(builder.rowBuilders), "buffer should have 5 rowBuilders") + + // Second record: 2 rows (buffer shrinks to 2) + rows2 := make(arrowtest.Rows, 2) + for i := 0; i < 2; i++ { + rows2[i] = arrowtest.Row{ + colTs.FQN(): time.Unix(0, int64(1620000000000000010+i)).UTC(), + colMsg.FQN(): "log line", + colEnv.FQN(): "dev", + } + } + record2 := rows2.Record(memory.DefaultAllocator, schema) + builder.CollectRecord(record2) + record2.Release() + require.Equal(t, 7, builder.Len()) + require.Equal(t, 2, len(builder.rowBuilders), "buffer should shrink to 2 rowBuilders") + + // Third record: 10 rows (buffer grows to 10) + rows3 := make(arrowtest.Rows, 10) + for i := 0; i < 10; i++ { + rows3[i] = arrowtest.Row{ + colTs.FQN(): time.Unix(0, int64(1620000000000000020+i)).UTC(), + colMsg.FQN(): "log line", + colEnv.FQN(): "staging", + } + } + record3 := rows3.Record(memory.DefaultAllocator, schema) + builder.CollectRecord(record3) + record3.Release() + require.Equal(t, 17, builder.Len()) + require.Equal(t, 10, len(builder.rowBuilders), "buffer should grow to 10 rowBuilders") + + // Verify all rowBuilders are properly initialized + for i := 0; i < len(builder.rowBuilders); i++ { + require.NotNil(t, builder.rowBuilders[i].lbsBuilder, "lbsBuilder should be initialized") + require.NotNil(t, builder.rowBuilders[i].metadataBuilder, "metadataBuilder should be initialized") + require.NotNil(t, builder.rowBuilders[i].parsedBuilder, "parsedBuilder should be initialized") + } + }) + + t.Run("empty records mixed with valid records", func(t *testing.T) { + colTs := semconv.ColumnIdentTimestamp + colMsg := semconv.ColumnIdentMessage + colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String) + + schema := arrow.NewSchema( + []arrow.Field{ + semconv.FieldFromIdent(colTs, false), + semconv.FieldFromIdent(colMsg, false), + semconv.FieldFromIdent(colEnv, false), + }, + nil, + ) + + builder := newStreamsResultBuilder() + + // First record: 3 valid rows + rows1 := make(arrowtest.Rows, 3) + for i := 0; i < 3; i++ { + rows1[i] = arrowtest.Row{ + colTs.FQN(): time.Unix(0, int64(1620000000000000001+i)).UTC(), + colMsg.FQN(): "log line", + colEnv.FQN(): "prod", + } + } + record1 := rows1.Record(memory.DefaultAllocator, schema) + builder.CollectRecord(record1) + record1.Release() + require.Equal(t, 3, builder.Len()) + + // Second record: empty (0 rows) + rows2 := arrowtest.Rows{} + record2 := rows2.Record(memory.DefaultAllocator, schema) + builder.CollectRecord(record2) + record2.Release() + require.Equal(t, 3, builder.Len(), "empty record should not change count") + + // Third record: 2 valid rows + rows3 := make(arrowtest.Rows, 2) + for i := 0; i < 2; i++ { + rows3[i] = arrowtest.Row{ + colTs.FQN(): time.Unix(0, int64(1620000000000000010+i)).UTC(), + colMsg.FQN(): "log line", + colEnv.FQN(): "dev", + } + } + record3 := rows3.Record(memory.DefaultAllocator, schema) + builder.CollectRecord(record3) + record3.Release() + require.Equal(t, 5, builder.Len(), "should have 5 total entries") + + // Verify final result + md, _ := metadata.NewContext(t.Context()) + result := builder.Build(stats.Result{}, md) + streams := result.Data.(logqlmodel.Streams) + // Note: 2 unique streams (prod with 3 entries, dev with 2 entries) = 5 total entries + require.Equal(t, 2, len(streams), "should have 2 unique streams") + + // Verify the streams have the correct number of entries + var totalEntries int + for _, stream := range streams { + totalEntries += len(stream.Entries) + } + require.Equal(t, 5, totalEntries, "should have 5 total entries across both streams") + }) } func TestVectorResultBuilder(t *testing.T) {