diff --git a/api/gen/proto/go/types/v1/types.pb.go b/api/gen/proto/go/types/v1/types.pb.go index 5696865900..8bb7a3c0f6 100644 --- a/api/gen/proto/go/types/v1/types.pb.go +++ b/api/gen/proto/go/types/v1/types.pb.go @@ -26,6 +26,7 @@ type TimeSeriesAggregationType int32 const ( TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_SUM TimeSeriesAggregationType = 0 TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_AVERAGE TimeSeriesAggregationType = 1 + TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE TimeSeriesAggregationType = 2 ) // Enum value maps for TimeSeriesAggregationType. @@ -33,10 +34,12 @@ var ( TimeSeriesAggregationType_name = map[int32]string{ 0: "TIME_SERIES_AGGREGATION_TYPE_SUM", 1: "TIME_SERIES_AGGREGATION_TYPE_AVERAGE", + 2: "TIME_SERIES_AGGREGATION_TYPE_RATE", } TimeSeriesAggregationType_value = map[string]int32{ "TIME_SERIES_AGGREGATION_TYPE_SUM": 0, "TIME_SERIES_AGGREGATION_TYPE_AVERAGE": 1, + "TIME_SERIES_AGGREGATION_TYPE_RATE": 2, } ) @@ -1124,24 +1127,26 @@ var file_types_v1_types_proto_rawDesc = string([]byte{ 0x69, 0x6d, 0x65, 0x12, 0x2e, 0x0a, 0x13, 0x6e, 0x65, 0x77, 0x65, 0x73, 0x74, 0x5f, 0x70, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x11, 0x6e, 0x65, 0x77, 0x65, 0x73, 0x74, 0x50, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x54, - 0x69, 0x6d, 0x65, 0x2a, 0x6b, 0x0a, 0x19, 0x54, 0x69, 0x6d, 0x65, 0x53, 0x65, 0x72, 0x69, 0x65, - 0x73, 0x41, 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, - 0x12, 0x24, 0x0a, 0x20, 0x54, 0x49, 0x4d, 0x45, 0x5f, 0x53, 0x45, 0x52, 0x49, 0x45, 0x53, 0x5f, - 0x41, 0x47, 0x47, 0x52, 0x45, 0x47, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x54, 0x59, 0x50, 0x45, - 0x5f, 0x53, 0x55, 0x4d, 0x10, 0x00, 0x12, 0x28, 0x0a, 0x24, 0x54, 0x49, 0x4d, 0x45, 0x5f, 0x53, - 0x45, 0x52, 0x49, 0x45, 0x53, 0x5f, 0x41, 0x47, 0x47, 0x52, 0x45, 0x47, 0x41, 0x54, 0x49, 0x4f, - 0x4e, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x41, 0x56, 0x45, 0x52, 0x41, 0x47, 0x45, 0x10, 0x01, - 0x42, 0x9b, 0x01, 0x0a, 0x0c, 0x63, 0x6f, 0x6d, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x76, - 0x31, 0x42, 0x0a, 0x54, 0x79, 0x70, 0x65, 0x73, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, - 0x3e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x72, 0x61, 0x66, - 0x61, 0x6e, 0x61, 0x2f, 0x70, 0x79, 0x72, 0x6f, 0x73, 0x63, 0x6f, 0x70, 0x65, 0x2f, 0x61, 0x70, - 0x69, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x74, - 0x79, 0x70, 0x65, 0x73, 0x2f, 0x76, 0x31, 0x3b, 0x74, 0x79, 0x70, 0x65, 0x73, 0x76, 0x31, 0xa2, - 0x02, 0x03, 0x54, 0x58, 0x58, 0xaa, 0x02, 0x08, 0x54, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x56, 0x31, - 0xca, 0x02, 0x08, 0x54, 0x79, 0x70, 0x65, 0x73, 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x14, 0x54, 0x79, - 0x70, 0x65, 0x73, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, - 0x74, 0x61, 0xea, 0x02, 0x09, 0x54, 0x79, 0x70, 0x65, 0x73, 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x69, 0x6d, 0x65, 0x2a, 0x92, 0x01, 0x0a, 0x19, 0x54, 0x69, 0x6d, 0x65, 0x53, 0x65, 0x72, 0x69, + 0x65, 0x73, 0x41, 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, + 0x65, 0x12, 0x24, 0x0a, 0x20, 0x54, 0x49, 0x4d, 0x45, 0x5f, 0x53, 0x45, 0x52, 0x49, 0x45, 0x53, + 0x5f, 0x41, 0x47, 0x47, 0x52, 0x45, 0x47, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x54, 0x59, 0x50, + 0x45, 0x5f, 0x53, 0x55, 0x4d, 0x10, 0x00, 0x12, 0x28, 0x0a, 0x24, 0x54, 0x49, 0x4d, 0x45, 0x5f, + 0x53, 0x45, 0x52, 0x49, 0x45, 0x53, 0x5f, 0x41, 0x47, 0x47, 0x52, 0x45, 0x47, 0x41, 0x54, 0x49, + 0x4f, 0x4e, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x41, 0x56, 0x45, 0x52, 0x41, 0x47, 0x45, 0x10, + 0x01, 0x12, 0x25, 0x0a, 0x21, 0x54, 0x49, 0x4d, 0x45, 0x5f, 0x53, 0x45, 0x52, 0x49, 0x45, 0x53, + 0x5f, 0x41, 0x47, 0x47, 0x52, 0x45, 0x47, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x54, 0x59, 0x50, + 0x45, 0x5f, 0x52, 0x41, 0x54, 0x45, 0x10, 0x02, 0x42, 0x9b, 0x01, 0x0a, 0x0c, 0x63, 0x6f, 0x6d, + 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x42, 0x0a, 0x54, 0x79, 0x70, 0x65, 0x73, + 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x3e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, + 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x72, 0x61, 0x66, 0x61, 0x6e, 0x61, 0x2f, 0x70, 0x79, 0x72, 0x6f, + 0x73, 0x63, 0x6f, 0x70, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2f, 0x76, 0x31, 0x3b, + 0x74, 0x79, 0x70, 0x65, 0x73, 0x76, 0x31, 0xa2, 0x02, 0x03, 0x54, 0x58, 0x58, 0xaa, 0x02, 0x08, + 0x54, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x08, 0x54, 0x79, 0x70, 0x65, 0x73, + 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x14, 0x54, 0x79, 0x70, 0x65, 0x73, 0x5c, 0x56, 0x31, 0x5c, 0x47, + 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x09, 0x54, 0x79, 0x70, + 0x65, 0x73, 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, }) var ( diff --git a/api/openapiv2/gen/phlare.swagger.json b/api/openapiv2/gen/phlare.swagger.json index af52dc673d..6fe425ccd6 100644 --- a/api/openapiv2/gen/phlare.swagger.json +++ b/api/openapiv2/gen/phlare.swagger.json @@ -2387,7 +2387,8 @@ "type": "string", "enum": [ "TIME_SERIES_AGGREGATION_TYPE_SUM", - "TIME_SERIES_AGGREGATION_TYPE_AVERAGE" + "TIME_SERIES_AGGREGATION_TYPE_AVERAGE", + "TIME_SERIES_AGGREGATION_TYPE_RATE" ], "default": "TIME_SERIES_AGGREGATION_TYPE_SUM" }, diff --git a/api/types/v1/types.proto b/api/types/v1/types.proto index 574b83ec24..bb3c4e37db 100644 --- a/api/types/v1/types.proto +++ b/api/types/v1/types.proto @@ -76,6 +76,7 @@ message BlockCompaction { enum TimeSeriesAggregationType { TIME_SERIES_AGGREGATION_TYPE_SUM = 0; TIME_SERIES_AGGREGATION_TYPE_AVERAGE = 1; + TIME_SERIES_AGGREGATION_TYPE_RATE = 2; } // StackTraceSelector is used for filtering stack traces by locations. diff --git a/pkg/experiment/query_backend/query_time_series.go b/pkg/experiment/query_backend/query_time_series.go index d3ee6e74c9..800d0fd35a 100644 --- a/pkg/experiment/query_backend/query_time_series.go +++ b/pkg/experiment/query_backend/query_time_series.go @@ -3,13 +3,11 @@ package query_backend import ( "strings" "sync" - "time" "github.com/grafana/dskit/runutil" "github.com/parquet-go/parquet-go" queryv1 "github.com/grafana/pyroscope/api/gen/proto/go/query/v1" - typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" "github.com/grafana/pyroscope/pkg/experiment/block" phlaremodel "github.com/grafana/pyroscope/pkg/model" parquetquery "github.com/grafana/pyroscope/pkg/phlaredb/query" @@ -111,7 +109,7 @@ func newTimeSeriesAggregator(req *queryv1.InvokeRequest) aggregator { func (a *timeSeriesAggregator) aggregate(report *queryv1.Report) error { r := report.TimeSeries a.init.Do(func() { - a.series = phlaremodel.NewTimeSeriesMerger(true) + a.series = phlaremodel.NewTimeSeriesMerger(false) a.query = r.Query.CloneVT() }) a.series.MergeTimeSeries(r.TimeSeries) @@ -119,22 +117,10 @@ func (a *timeSeriesAggregator) aggregate(report *queryv1.Report) error { } func (a *timeSeriesAggregator) build() *queryv1.Report { - // TODO(kolesnikovae): Average aggregation should be implemented in - // the way that it can be distributed (count + sum), and should be done - // at "aggregate" call. - sum := typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_SUM - stepMilli := time.Duration(a.query.GetStep() * float64(time.Second)).Milliseconds() - seriesIterator := phlaremodel.NewTimeSeriesMergeIterator(a.series.TimeSeries()) return &queryv1.Report{ TimeSeries: &queryv1.TimeSeriesReport{ - Query: a.query, - TimeSeries: phlaremodel.RangeSeries( - seriesIterator, - a.startTime, - a.endTime, - stepMilli, - &sum, - ), + Query: a.query, + TimeSeries: a.series.TimeSeries(), }, } } diff --git a/pkg/frontend/read_path/query_frontend/query_select_time_series.go b/pkg/frontend/read_path/query_frontend/query_select_time_series.go index 233203e48b..2cfa8305da 100644 --- a/pkg/frontend/read_path/query_frontend/query_select_time_series.go +++ b/pkg/frontend/read_path/query_frontend/query_select_time_series.go @@ -65,6 +65,9 @@ func (q *QueryFrontend) SelectSeries( if report == nil { return connect.NewResponse(&querierv1.SelectSeriesResponse{}), nil } - series := phlaremodel.TopSeries(report.TimeSeries.TimeSeries, int(c.Msg.GetLimit())) + + it := phlaremodel.NewTimeSeriesMergeIterator(report.TimeSeries.TimeSeries) + aggregatedSeries := phlaremodel.RangeSeries(it, c.Msg.Start, c.Msg.End, stepMs, c.Msg.Aggregation) + series := phlaremodel.TopSeries(aggregatedSeries, int(c.Msg.GetLimit())) return connect.NewResponse(&querierv1.SelectSeriesResponse{Series: series}), nil } diff --git a/pkg/model/time_series.go b/pkg/model/time_series.go index 318eb2f86e..76e0ccbf97 100644 --- a/pkg/model/time_series.go +++ b/pkg/model/time_series.go @@ -69,14 +69,23 @@ type TimeSeriesAggregator interface { GetTimestamp() int64 } -func NewTimeSeriesAggregator(aggregation *typesv1.TimeSeriesAggregationType) TimeSeriesAggregator { - if aggregation == nil { - return &sumTimeSeriesAggregator{ts: -1} +func NewTimeSeriesAggregator(stepDurationSec float64, aggregation *typesv1.TimeSeriesAggregationType) TimeSeriesAggregator { + // Default to sum for backward compatibility + aggregationType := typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_SUM + if aggregation != nil { + aggregationType = *aggregation } - if *aggregation == typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_AVERAGE { + + switch aggregationType { + case typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE: + return &rateTimeSeriesAggregator{ts: -1, stepDurationSec: stepDurationSec} + case typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_AVERAGE: return &avgTimeSeriesAggregator{ts: -1} + case typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_SUM: + return &sumTimeSeriesAggregator{ts: -1} + default: + return &sumTimeSeriesAggregator{ts: -1} } - return &sumTimeSeriesAggregator{ts: -1} } type sumTimeSeriesAggregator struct { @@ -142,6 +151,39 @@ func (a *avgTimeSeriesAggregator) GetAndReset() *typesv1.Point { func (a *avgTimeSeriesAggregator) IsEmpty() bool { return a.ts == -1 } func (a *avgTimeSeriesAggregator) GetTimestamp() int64 { return a.ts } +// rateTimeSeriesAggregator normalizes cumulative profile values by step duration to produce rates +type rateTimeSeriesAggregator struct { + ts int64 + sum float64 + stepDurationSec float64 + annotations []*typesv1.ProfileAnnotation +} + +func (a *rateTimeSeriesAggregator) Add(ts int64, point *TimeSeriesValue) { + a.ts = ts + a.sum += point.Value + a.annotations = append(a.annotations, point.Annotations...) +} + +func (a *rateTimeSeriesAggregator) GetAndReset() *typesv1.Point { + tsCopy := a.ts + // Normalize cumulative values by step duration to produce rates + normalizedValue := a.sum / a.stepDurationSec + annotationsCopy := make([]*typesv1.ProfileAnnotation, len(a.annotations)) + copy(annotationsCopy, a.annotations) + a.ts = -1 + a.sum = 0 + a.annotations = a.annotations[:0] + return &typesv1.Point{ + Timestamp: tsCopy, + Value: normalizedValue, + Annotations: annotationsCopy, + } +} + +func (a *rateTimeSeriesAggregator) IsEmpty() bool { return a.ts == -1 } +func (a *rateTimeSeriesAggregator) GetTimestamp() int64 { return a.ts } + // RangeSeries aggregates profiles into series. // Series contains points spaced by step from start to end. // Profiles from the same step are aggregated into one point. @@ -149,6 +191,7 @@ func RangeSeries(it iter.Iterator[TimeSeriesValue], start, end, step int64, aggr defer it.Close() seriesMap := make(map[uint64]*typesv1.Series) aggregators := make(map[uint64]TimeSeriesAggregator) + stepDurationSec := float64(step) / 1000.0 if !it.Next() { return nil @@ -161,7 +204,7 @@ Outer: point := it.At() aggregator, ok := aggregators[point.LabelsHash] if !ok { - aggregator = NewTimeSeriesAggregator(aggregation) + aggregator = NewTimeSeriesAggregator(stepDurationSec, aggregation) aggregators[point.LabelsHash] = aggregator } if point.Ts > currentStep { diff --git a/pkg/model/time_series_test.go b/pkg/model/time_series_test.go index 9961755603..255e0424c7 100644 --- a/pkg/model/time_series_test.go +++ b/pkg/model/time_series_test.go @@ -6,11 +6,13 @@ import ( typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" "github.com/grafana/pyroscope/pkg/iter" "github.com/grafana/pyroscope/pkg/testhelper" + + "github.com/stretchr/testify/assert" ) func Test_RangeSeriesSum(t *testing.T) { - seriesA := NewLabelsBuilder(nil).Set("foo", "bar").Labels() - seriesB := NewLabelsBuilder(nil).Set("foo", "buzz").Labels() + seriesA := NewLabelsBuilder(nil).Set("foo", "bar").Set("__type__", "contentions").Labels() + seriesB := NewLabelsBuilder(nil).Set("foo", "buzz").Set("__type__", "contentions").Labels() for _, tc := range []struct { name string in []TimeSeriesValue @@ -19,15 +21,16 @@ func Test_RangeSeriesSum(t *testing.T) { { name: "single series", in: []TimeSeriesValue{ - {Ts: 1, Value: 1}, - {Ts: 1, Value: 1}, - {Ts: 2, Value: 2}, - {Ts: 3, Value: 3}, - {Ts: 4, Value: 4}, - {Ts: 5, Value: 5, Annotations: []*typesv1.ProfileAnnotation{{Key: "foo", Value: "bar"}}}, + {Ts: 1, Value: 1, Lbs: seriesA, LabelsHash: seriesA.Hash()}, + {Ts: 1, Value: 1, Lbs: seriesA, LabelsHash: seriesA.Hash()}, + {Ts: 2, Value: 2, Lbs: seriesA, LabelsHash: seriesA.Hash()}, + {Ts: 3, Value: 3, Lbs: seriesA, LabelsHash: seriesA.Hash()}, + {Ts: 4, Value: 4, Lbs: seriesA, LabelsHash: seriesA.Hash()}, + {Ts: 5, Value: 5, Lbs: seriesA, LabelsHash: seriesA.Hash(), Annotations: []*typesv1.ProfileAnnotation{{Key: "foo", Value: "bar"}}}, }, out: []*typesv1.Series{ { + Labels: seriesA, Points: []*typesv1.Point{ {Timestamp: 1, Value: 2, Annotations: []*typesv1.ProfileAnnotation{}}, {Timestamp: 2, Value: 2, Annotations: []*typesv1.ProfileAnnotation{}}, @@ -83,8 +86,8 @@ func Test_RangeSeriesSum(t *testing.T) { } func Test_RangeSeriesAvg(t *testing.T) { - seriesA := NewLabelsBuilder(nil).Set("foo", "bar").Labels() - seriesB := NewLabelsBuilder(nil).Set("foo", "buzz").Labels() + seriesA := NewLabelsBuilder(nil).Set("foo", "bar").Set("__type__", "inuse_objects").Labels() + seriesB := NewLabelsBuilder(nil).Set("foo", "buzz").Set("__type__", "inuse_objects").Labels() for _, tc := range []struct { name string in []TimeSeriesValue @@ -93,15 +96,16 @@ func Test_RangeSeriesAvg(t *testing.T) { { name: "single series", in: []TimeSeriesValue{ - {Ts: 1, Value: 1}, - {Ts: 1, Value: 2}, - {Ts: 2, Value: 2}, - {Ts: 2, Value: 3}, - {Ts: 3, Value: 4}, - {Ts: 4, Value: 5, Annotations: []*typesv1.ProfileAnnotation{{Key: "foo", Value: "bar"}}}, + {Ts: 1, Value: 1, Lbs: seriesA, LabelsHash: seriesA.Hash()}, + {Ts: 1, Value: 2, Lbs: seriesA, LabelsHash: seriesA.Hash()}, + {Ts: 2, Value: 2, Lbs: seriesA, LabelsHash: seriesA.Hash()}, + {Ts: 2, Value: 3, Lbs: seriesA, LabelsHash: seriesA.Hash()}, + {Ts: 3, Value: 4, Lbs: seriesA, LabelsHash: seriesA.Hash()}, + {Ts: 4, Value: 5, Lbs: seriesA, LabelsHash: seriesA.Hash(), Annotations: []*typesv1.ProfileAnnotation{{Key: "foo", Value: "bar"}}}, }, out: []*typesv1.Series{ { + Labels: seriesA, Points: []*typesv1.Point{ {Timestamp: 1, Value: 1.5, Annotations: []*typesv1.ProfileAnnotation{}}, // avg of 1 and 2 {Timestamp: 2, Value: 2.5, Annotations: []*typesv1.ProfileAnnotation{}}, // avg of 2 and 3 @@ -154,3 +158,96 @@ func Test_RangeSeriesAvg(t *testing.T) { }) } } + +func Test_RangeSeriesRateNormalization(t *testing.T) { + cpuLabels := NewLabelsBuilder(nil).Set("__type__", "cpu").Labels() + + testData := []TimeSeriesValue{ + {Ts: 0, Value: 0, Lbs: cpuLabels, LabelsHash: cpuLabels.Hash()}, // t=0: start + {Ts: 15000, Value: 30, Lbs: cpuLabels, LabelsHash: cpuLabels.Hash()}, // t=15s: 30s CPU + {Ts: 30000, Value: 0, Lbs: cpuLabels, LabelsHash: cpuLabels.Hash()}, // t=30s: reset + {Ts: 45000, Value: 30, Lbs: cpuLabels, LabelsHash: cpuLabels.Hash()}, // t=45s: 30s CPU again + } + + // Test with 15 second steps (step = 15000ms) - creates 4 bucket + in15s := iter.NewSliceIterator(testData) + rateAggregation := typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE + out15s := RangeSeries(in15s, 0, 60000, 15000, &rateAggregation) + + // Test with 30 second steps (step = 30000ms) - creates 2 buckets + in30s := iter.NewSliceIterator(testData) + out30s := RangeSeries(in30s, 0, 60000, 30000, &rateAggregation) + + var total15s, total30s float64 + for _, series := range out15s { + for _, point := range series.Points { + total15s += point.Value * 15.0 + } + } + + for _, series := range out30s { + for _, point := range series.Points { + total30s += point.Value * 30.0 + } + } + + assert.Equal(t, total15s, total30s) + assert.Equal(t, 60.0, total15s) + assert.Equal(t, 60.0, total30s) +} + +func Test_NewTimeSeriesAggregatorSelection(t *testing.T) { + stepDuration := 5.0 + + tests := []struct { + name string + aggregation *typesv1.TimeSeriesAggregationType + expectedType string + description string + }{ + { + name: "No aggregation defaults to sum for backward compatibility", + aggregation: nil, + expectedType: "sum", + }, + { + name: "Explicit sum aggregation", + aggregation: &[]typesv1.TimeSeriesAggregationType{typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_SUM}[0], + expectedType: "sum", + }, + { + name: "Explicit rate aggregation", + aggregation: &[]typesv1.TimeSeriesAggregationType{typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE}[0], + expectedType: "rate", + }, + { + name: "Explicit average aggregation", + aggregation: &[]typesv1.TimeSeriesAggregationType{typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_AVERAGE}[0], + expectedType: "avg", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + aggregator := NewTimeSeriesAggregator(stepDuration, tt.aggregation) + actualType := getAggregatorType(t, aggregator) + if actualType != tt.expectedType { + t.Errorf("Expected %s aggregator, got %s.", tt.expectedType, actualType) + } + }) + } +} + +func getAggregatorType(t *testing.T, agg TimeSeriesAggregator) string { + t.Helper() + switch agg.(type) { + case *sumTimeSeriesAggregator: + return "sum" + case *avgTimeSeriesAggregator: + return "avg" + case *rateTimeSeriesAggregator: + return "rate" + default: + return "unknown" + } +} diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 53e54a2855..38883ed0cb 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -143,6 +143,8 @@ func (q *QueryHandlers) Render(w http.ResponseWriter, req *http.Request) { aggregation = typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_SUM case "avg": aggregation = typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_AVERAGE + case "rate": + aggregation = typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE } } diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index dd6c66ef27..9aaa3a9992 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -509,12 +509,15 @@ func TestSelectSeries(t *testing.T) { {true, "WithBlockHints"}, } { t.Run(tc.name, func(t *testing.T) { + foobarlabels := phlaremodel.NewLabelsBuilder(nil).Set("foo", "bar").Labels() + req := connect.NewRequest(&querierv1.SelectSeriesRequest{ LabelSelector: `{app="foo"}`, ProfileTypeID: "memory:inuse_space:bytes:space:byte", Start: 0, End: 2, Step: 0.001, + // No aggregation specified - uses default (sum) for backward compatibility }) bidi1 := newFakeBidiClientSeries([]*ingestv1.ProfileSets{ {