From a0bfd311ae208317b5260b60e961081ffb0e9025 Mon Sep 17 00:00:00 2001 From: Marc Sanmiquel Date: Mon, 16 Jun 2025 12:37:35 +0200 Subject: [PATCH 1/2] fix: time series aggregation inconsistency across different time windows --- api/gen/proto/go/types/v1/types.pb.go | 41 ++++--- api/openapiv2/gen/phlare.swagger.json | 3 +- api/types/v1/types.proto | 1 + pkg/model/profile_types.go | 163 ++++++++++++++++++++++++++ pkg/model/time_series.go | 70 ++++++++++- pkg/model/time_series_test.go | 162 ++++++++++++++++++++++--- pkg/querier/http.go | 2 + 7 files changed, 401 insertions(+), 41 deletions(-) create mode 100644 pkg/model/profile_types.go diff --git a/api/gen/proto/go/types/v1/types.pb.go b/api/gen/proto/go/types/v1/types.pb.go index 5696865900..8bb7a3c0f6 100644 --- a/api/gen/proto/go/types/v1/types.pb.go +++ b/api/gen/proto/go/types/v1/types.pb.go @@ -26,6 +26,7 @@ type TimeSeriesAggregationType int32 const ( TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_SUM TimeSeriesAggregationType = 0 TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_AVERAGE TimeSeriesAggregationType = 1 + TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE TimeSeriesAggregationType = 2 ) // Enum value maps for TimeSeriesAggregationType. @@ -33,10 +34,12 @@ var ( TimeSeriesAggregationType_name = map[int32]string{ 0: "TIME_SERIES_AGGREGATION_TYPE_SUM", 1: "TIME_SERIES_AGGREGATION_TYPE_AVERAGE", + 2: "TIME_SERIES_AGGREGATION_TYPE_RATE", } TimeSeriesAggregationType_value = map[string]int32{ "TIME_SERIES_AGGREGATION_TYPE_SUM": 0, "TIME_SERIES_AGGREGATION_TYPE_AVERAGE": 1, + "TIME_SERIES_AGGREGATION_TYPE_RATE": 2, } ) @@ -1124,24 +1127,26 @@ var file_types_v1_types_proto_rawDesc = string([]byte{ 0x69, 0x6d, 0x65, 0x12, 0x2e, 0x0a, 0x13, 0x6e, 0x65, 0x77, 0x65, 0x73, 0x74, 0x5f, 0x70, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x11, 0x6e, 0x65, 0x77, 0x65, 0x73, 0x74, 0x50, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x54, - 0x69, 0x6d, 0x65, 0x2a, 0x6b, 0x0a, 0x19, 0x54, 0x69, 0x6d, 0x65, 0x53, 0x65, 0x72, 0x69, 0x65, - 0x73, 0x41, 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, - 0x12, 0x24, 0x0a, 0x20, 0x54, 0x49, 0x4d, 0x45, 0x5f, 0x53, 0x45, 0x52, 0x49, 0x45, 0x53, 0x5f, - 0x41, 0x47, 0x47, 0x52, 0x45, 0x47, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x54, 0x59, 0x50, 0x45, - 0x5f, 0x53, 0x55, 0x4d, 0x10, 0x00, 0x12, 0x28, 0x0a, 0x24, 0x54, 0x49, 0x4d, 0x45, 0x5f, 0x53, - 0x45, 0x52, 0x49, 0x45, 0x53, 0x5f, 0x41, 0x47, 0x47, 0x52, 0x45, 0x47, 0x41, 0x54, 0x49, 0x4f, - 0x4e, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x41, 0x56, 0x45, 0x52, 0x41, 0x47, 0x45, 0x10, 0x01, - 0x42, 0x9b, 0x01, 0x0a, 0x0c, 0x63, 0x6f, 0x6d, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x76, - 0x31, 0x42, 0x0a, 0x54, 0x79, 0x70, 0x65, 0x73, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, - 0x3e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x72, 0x61, 0x66, - 0x61, 0x6e, 0x61, 0x2f, 0x70, 0x79, 0x72, 0x6f, 0x73, 0x63, 0x6f, 0x70, 0x65, 0x2f, 0x61, 0x70, - 0x69, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x74, - 0x79, 0x70, 0x65, 0x73, 0x2f, 0x76, 0x31, 0x3b, 0x74, 0x79, 0x70, 0x65, 0x73, 0x76, 0x31, 0xa2, - 0x02, 0x03, 0x54, 0x58, 0x58, 0xaa, 0x02, 0x08, 0x54, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x56, 0x31, - 0xca, 0x02, 0x08, 0x54, 0x79, 0x70, 0x65, 0x73, 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x14, 0x54, 0x79, - 0x70, 0x65, 0x73, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, - 0x74, 0x61, 0xea, 0x02, 0x09, 0x54, 0x79, 0x70, 0x65, 0x73, 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x69, 0x6d, 0x65, 0x2a, 0x92, 0x01, 0x0a, 0x19, 0x54, 0x69, 0x6d, 0x65, 0x53, 0x65, 0x72, 0x69, + 0x65, 0x73, 0x41, 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, + 0x65, 0x12, 0x24, 0x0a, 0x20, 0x54, 0x49, 0x4d, 0x45, 0x5f, 0x53, 0x45, 0x52, 0x49, 0x45, 0x53, + 0x5f, 0x41, 0x47, 0x47, 0x52, 0x45, 0x47, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x54, 0x59, 0x50, + 0x45, 0x5f, 0x53, 0x55, 0x4d, 0x10, 0x00, 0x12, 0x28, 0x0a, 0x24, 0x54, 0x49, 0x4d, 0x45, 0x5f, + 0x53, 0x45, 0x52, 0x49, 0x45, 0x53, 0x5f, 0x41, 0x47, 0x47, 0x52, 0x45, 0x47, 0x41, 0x54, 0x49, + 0x4f, 0x4e, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x41, 0x56, 0x45, 0x52, 0x41, 0x47, 0x45, 0x10, + 0x01, 0x12, 0x25, 0x0a, 0x21, 0x54, 0x49, 0x4d, 0x45, 0x5f, 0x53, 0x45, 0x52, 0x49, 0x45, 0x53, + 0x5f, 0x41, 0x47, 0x47, 0x52, 0x45, 0x47, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x54, 0x59, 0x50, + 0x45, 0x5f, 0x52, 0x41, 0x54, 0x45, 0x10, 0x02, 0x42, 0x9b, 0x01, 0x0a, 0x0c, 0x63, 0x6f, 0x6d, + 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x42, 0x0a, 0x54, 0x79, 0x70, 0x65, 0x73, + 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x3e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, + 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x72, 0x61, 0x66, 0x61, 0x6e, 0x61, 0x2f, 0x70, 0x79, 0x72, 0x6f, + 0x73, 0x63, 0x6f, 0x70, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2f, 0x76, 0x31, 0x3b, + 0x74, 0x79, 0x70, 0x65, 0x73, 0x76, 0x31, 0xa2, 0x02, 0x03, 0x54, 0x58, 0x58, 0xaa, 0x02, 0x08, + 0x54, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x08, 0x54, 0x79, 0x70, 0x65, 0x73, + 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x14, 0x54, 0x79, 0x70, 0x65, 0x73, 0x5c, 0x56, 0x31, 0x5c, 0x47, + 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x09, 0x54, 0x79, 0x70, + 0x65, 0x73, 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, }) var ( diff --git a/api/openapiv2/gen/phlare.swagger.json b/api/openapiv2/gen/phlare.swagger.json index 743df295e6..231837ada4 100644 --- a/api/openapiv2/gen/phlare.swagger.json +++ b/api/openapiv2/gen/phlare.swagger.json @@ -2349,7 +2349,8 @@ "type": "string", "enum": [ "TIME_SERIES_AGGREGATION_TYPE_SUM", - "TIME_SERIES_AGGREGATION_TYPE_AVERAGE" + "TIME_SERIES_AGGREGATION_TYPE_AVERAGE", + "TIME_SERIES_AGGREGATION_TYPE_RATE" ], "default": "TIME_SERIES_AGGREGATION_TYPE_SUM" }, diff --git a/api/types/v1/types.proto b/api/types/v1/types.proto index 574b83ec24..bb3c4e37db 100644 --- a/api/types/v1/types.proto +++ b/api/types/v1/types.proto @@ -76,6 +76,7 @@ message BlockCompaction { enum TimeSeriesAggregationType { TIME_SERIES_AGGREGATION_TYPE_SUM = 0; TIME_SERIES_AGGREGATION_TYPE_AVERAGE = 1; + TIME_SERIES_AGGREGATION_TYPE_RATE = 2; } // StackTraceSelector is used for filtering stack traces by locations. diff --git a/pkg/model/profile_types.go b/pkg/model/profile_types.go new file mode 100644 index 0000000000..35d724cf08 --- /dev/null +++ b/pkg/model/profile_types.go @@ -0,0 +1,163 @@ +package model + +import ( + typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" +) + +// ProfileTypeInfo contains aggregation and semantic information for profile types +type ProfileTypeInfo struct { + Name string + Description string + Group string + Unit string + IsCumulative bool + AggregationType typesv1.TimeSeriesAggregationType +} + +// ProfilesTypeRegistry contains all known profile types +var ProfilesTypeRegistry = map[string]ProfileTypeInfo{ + "samples": { + Name: "samples", + Description: "Number of sampling events collected", + Group: "cpu", + Unit: "short", + IsCumulative: false, + AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_SUM, + }, + "cpu": { + Name: "cpu", + Description: "CPU time consumed", + Group: "cpu", + Unit: "ns", + IsCumulative: true, + AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE, + }, + "inuse_objects": { + Name: "inuse_objects", + Description: "Number of objects currently in use", + Group: "memory", + Unit: "short", + IsCumulative: false, + AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_AVERAGE, + }, + "inuse_space": { + Name: "inuse_space", + Description: "Size of memory currently in use", + Group: "memory", + Unit: "bytes", + IsCumulative: false, + AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_AVERAGE, + }, + "alloc_objects": { + Name: "alloc_objects", + Description: "Number of objects allocated", + Group: "memory", + Unit: "short", + IsCumulative: true, + AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE, + }, + "alloc_space": { + Name: "alloc_space", + Description: "Size of memory allocated in the heap", + Group: "memory", + Unit: "bytes", + IsCumulative: true, + AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE, + }, + "alloc_samples": { + Name: "alloc_samples", + Description: "Number of memory allocation samples during CPU time", + Group: "memory", + Unit: "short", + IsCumulative: true, + AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE, + }, + "alloc_size": { + Name: "alloc_size", + Description: "Size of memory allocated during CPU time", + Group: "memory", + Unit: "bytes", + IsCumulative: true, + AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE, + }, + "alloc_in_new_tlab_bytes": { + Name: "alloc_in_new_tlab_bytes", + Description: "Size of memory allocated inside Thread-Local Allocation Buffers", + Group: "memory", + Unit: "bytes", + IsCumulative: true, + AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE, + }, + "alloc_in_new_tlab_objects": { + Name: "alloc_in_new_tlab_objects", + Description: "Number of objects allocated inside Thread-Local Allocation Buffers", + Group: "memory", + Unit: "short", + IsCumulative: true, + AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE, + }, + "goroutine": { + Name: "goroutine", + Description: "Number of goroutines", + Group: "goroutine", + Unit: "short", + IsCumulative: false, + AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_AVERAGE, + }, + "contentions": { + Name: "contentions", + Description: "Number of lock contentions observed", + Group: "locks", + Unit: "short", + IsCumulative: false, + AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_SUM, + }, + "lock_count": { + Name: "lock_count", + Description: "Number of lock acquisitions attempted", + Group: "locks", + Unit: "short", + IsCumulative: false, + AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_SUM, + }, + "delay": { + Name: "delay", + Description: "Time spent in blocking delays", + Group: "locks", + Unit: "ns", + IsCumulative: true, + AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE, + }, + "lock_time": { + Name: "lock_time", + Description: "Cumulative time spent acquiring locks", + Group: "locks", + Unit: "ns", + IsCumulative: true, + AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE, + }, + "exceptions": { + Name: "exceptions", + Description: "Number of exceptions within the sampled CPU time", + Group: "exceptions", + Unit: "short", + IsCumulative: false, + AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_SUM, + }, +} + +// GetProfileTypeInfo returns aggregation information for a profile type +func GetProfileTypeInfo(profileType string) ProfileTypeInfo { + if info, exists := ProfilesTypeRegistry[profileType]; exists { + return info + } + + return ProfileTypeInfo{ + Name: profileType, + Description: "Unknown profile type", + Group: "unknown", + Unit: "unknown", + IsCumulative: true, + AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE, + } +} diff --git a/pkg/model/time_series.go b/pkg/model/time_series.go index 318eb2f86e..b26f2e4e0f 100644 --- a/pkg/model/time_series.go +++ b/pkg/model/time_series.go @@ -69,14 +69,38 @@ type TimeSeriesAggregator interface { GetTimestamp() int64 } -func NewTimeSeriesAggregator(aggregation *typesv1.TimeSeriesAggregationType) TimeSeriesAggregator { - if aggregation == nil { - return &sumTimeSeriesAggregator{ts: -1} +func NewTimeSeriesAggregator(labels []*typesv1.LabelPair, stepDurationSec float64, aggregation *typesv1.TimeSeriesAggregationType) TimeSeriesAggregator { + profileType := getProfileTypeName(labels) + profileInfo := GetProfileTypeInfo(profileType) + + // If explicit aggregation type is provided, respect it for instant profiles only + if aggregation != nil && *aggregation == typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_AVERAGE { + // For instant profiles, use average aggregation + if !profileInfo.IsCumulative { + return &avgTimeSeriesAggregator{ts: -1} + } } - if *aggregation == typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_AVERAGE { + + switch profileInfo.AggregationType { + case typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE: + return &rateTimeSeriesAggregator{ts: -1, stepDurationSec: stepDurationSec} + case typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_AVERAGE: return &avgTimeSeriesAggregator{ts: -1} + case typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_SUM: + return &sumTimeSeriesAggregator{ts: -1} + default: + return &rateTimeSeriesAggregator{ts: -1, stepDurationSec: stepDurationSec} } - return &sumTimeSeriesAggregator{ts: -1} +} + +// getProfileTypeName extracts the profile type name from labels +func getProfileTypeName(labels []*typesv1.LabelPair) string { + for _, label := range labels { + if label.Name == "__type__" { + return label.Value + } + } + return "" } type sumTimeSeriesAggregator struct { @@ -142,6 +166,39 @@ func (a *avgTimeSeriesAggregator) GetAndReset() *typesv1.Point { func (a *avgTimeSeriesAggregator) IsEmpty() bool { return a.ts == -1 } func (a *avgTimeSeriesAggregator) GetTimestamp() int64 { return a.ts } +// rateTimeSeriesAggregator normalizes cumulative profile values by step duration to produce rates +type rateTimeSeriesAggregator struct { + ts int64 + sum float64 + stepDurationSec float64 + annotations []*typesv1.ProfileAnnotation +} + +func (a *rateTimeSeriesAggregator) Add(ts int64, point *TimeSeriesValue) { + a.ts = ts + a.sum += point.Value + a.annotations = append(a.annotations, point.Annotations...) +} + +func (a *rateTimeSeriesAggregator) GetAndReset() *typesv1.Point { + tsCopy := a.ts + // Normalize cumulative values by step duration to produce rates + normalizedValue := a.sum / a.stepDurationSec + annotationsCopy := make([]*typesv1.ProfileAnnotation, len(a.annotations)) + copy(annotationsCopy, a.annotations) + a.ts = -1 + a.sum = 0 + a.annotations = a.annotations[:0] + return &typesv1.Point{ + Timestamp: tsCopy, + Value: normalizedValue, + Annotations: annotationsCopy, + } +} + +func (a *rateTimeSeriesAggregator) IsEmpty() bool { return a.ts == -1 } +func (a *rateTimeSeriesAggregator) GetTimestamp() int64 { return a.ts } + // RangeSeries aggregates profiles into series. // Series contains points spaced by step from start to end. // Profiles from the same step are aggregated into one point. @@ -149,6 +206,7 @@ func RangeSeries(it iter.Iterator[TimeSeriesValue], start, end, step int64, aggr defer it.Close() seriesMap := make(map[uint64]*typesv1.Series) aggregators := make(map[uint64]TimeSeriesAggregator) + stepDurationSec := float64(step) / 1000.0 if !it.Next() { return nil @@ -161,7 +219,7 @@ Outer: point := it.At() aggregator, ok := aggregators[point.LabelsHash] if !ok { - aggregator = NewTimeSeriesAggregator(aggregation) + aggregator = NewTimeSeriesAggregator(point.Lbs, stepDurationSec, aggregation) aggregators[point.LabelsHash] = aggregator } if point.Ts > currentStep { diff --git a/pkg/model/time_series_test.go b/pkg/model/time_series_test.go index 9961755603..bd06018cf9 100644 --- a/pkg/model/time_series_test.go +++ b/pkg/model/time_series_test.go @@ -6,11 +6,13 @@ import ( typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" "github.com/grafana/pyroscope/pkg/iter" "github.com/grafana/pyroscope/pkg/testhelper" + + "github.com/stretchr/testify/assert" ) func Test_RangeSeriesSum(t *testing.T) { - seriesA := NewLabelsBuilder(nil).Set("foo", "bar").Labels() - seriesB := NewLabelsBuilder(nil).Set("foo", "buzz").Labels() + seriesA := NewLabelsBuilder(nil).Set("foo", "bar").Set("__type__", "contentions").Labels() + seriesB := NewLabelsBuilder(nil).Set("foo", "buzz").Set("__type__", "contentions").Labels() for _, tc := range []struct { name string in []TimeSeriesValue @@ -19,15 +21,16 @@ func Test_RangeSeriesSum(t *testing.T) { { name: "single series", in: []TimeSeriesValue{ - {Ts: 1, Value: 1}, - {Ts: 1, Value: 1}, - {Ts: 2, Value: 2}, - {Ts: 3, Value: 3}, - {Ts: 4, Value: 4}, - {Ts: 5, Value: 5, Annotations: []*typesv1.ProfileAnnotation{{Key: "foo", Value: "bar"}}}, + {Ts: 1, Value: 1, Lbs: seriesA, LabelsHash: seriesA.Hash()}, + {Ts: 1, Value: 1, Lbs: seriesA, LabelsHash: seriesA.Hash()}, + {Ts: 2, Value: 2, Lbs: seriesA, LabelsHash: seriesA.Hash()}, + {Ts: 3, Value: 3, Lbs: seriesA, LabelsHash: seriesA.Hash()}, + {Ts: 4, Value: 4, Lbs: seriesA, LabelsHash: seriesA.Hash()}, + {Ts: 5, Value: 5, Lbs: seriesA, LabelsHash: seriesA.Hash(), Annotations: []*typesv1.ProfileAnnotation{{Key: "foo", Value: "bar"}}}, }, out: []*typesv1.Series{ { + Labels: seriesA, Points: []*typesv1.Point{ {Timestamp: 1, Value: 2, Annotations: []*typesv1.ProfileAnnotation{}}, {Timestamp: 2, Value: 2, Annotations: []*typesv1.ProfileAnnotation{}}, @@ -83,8 +86,8 @@ func Test_RangeSeriesSum(t *testing.T) { } func Test_RangeSeriesAvg(t *testing.T) { - seriesA := NewLabelsBuilder(nil).Set("foo", "bar").Labels() - seriesB := NewLabelsBuilder(nil).Set("foo", "buzz").Labels() + seriesA := NewLabelsBuilder(nil).Set("foo", "bar").Set("__type__", "inuse_objects").Labels() + seriesB := NewLabelsBuilder(nil).Set("foo", "buzz").Set("__type__", "inuse_objects").Labels() for _, tc := range []struct { name string in []TimeSeriesValue @@ -93,15 +96,16 @@ func Test_RangeSeriesAvg(t *testing.T) { { name: "single series", in: []TimeSeriesValue{ - {Ts: 1, Value: 1}, - {Ts: 1, Value: 2}, - {Ts: 2, Value: 2}, - {Ts: 2, Value: 3}, - {Ts: 3, Value: 4}, - {Ts: 4, Value: 5, Annotations: []*typesv1.ProfileAnnotation{{Key: "foo", Value: "bar"}}}, + {Ts: 1, Value: 1, Lbs: seriesA, LabelsHash: seriesA.Hash()}, + {Ts: 1, Value: 2, Lbs: seriesA, LabelsHash: seriesA.Hash()}, + {Ts: 2, Value: 2, Lbs: seriesA, LabelsHash: seriesA.Hash()}, + {Ts: 2, Value: 3, Lbs: seriesA, LabelsHash: seriesA.Hash()}, + {Ts: 3, Value: 4, Lbs: seriesA, LabelsHash: seriesA.Hash()}, + {Ts: 4, Value: 5, Lbs: seriesA, LabelsHash: seriesA.Hash(), Annotations: []*typesv1.ProfileAnnotation{{Key: "foo", Value: "bar"}}}, }, out: []*typesv1.Series{ { + Labels: seriesA, Points: []*typesv1.Point{ {Timestamp: 1, Value: 1.5, Annotations: []*typesv1.ProfileAnnotation{}}, // avg of 1 and 2 {Timestamp: 2, Value: 2.5, Annotations: []*typesv1.ProfileAnnotation{}}, // avg of 2 and 3 @@ -154,3 +158,129 @@ func Test_RangeSeriesAvg(t *testing.T) { }) } } + +func Test_RangeSeriesRateNormalization(t *testing.T) { + cpuLabels := NewLabelsBuilder(nil).Set("__type__", "cpu").Labels() + + testData := []TimeSeriesValue{ + {Ts: 1000, Value: 50, Lbs: cpuLabels, LabelsHash: cpuLabels.Hash()}, + {Ts: 1000, Value: 50, Lbs: cpuLabels, LabelsHash: cpuLabels.Hash()}, + } + + // Test with 1 second steps (step = 1000ms) + in1s := iter.NewSliceIterator(testData) + out1s := RangeSeries(in1s, 1000, 1000, 1000, nil) + + // Test with 5 second steps (step = 5000ms) + in5s := iter.NewSliceIterator(testData) + out5s := RangeSeries(in5s, 1000, 1000, 5000, nil) + + if len(out1s) != 1 || len(out1s[0].Points) != 1 { + t.Fatal("Expected 1 series with 1 point for 1s test") + } + if len(out5s) != 1 || len(out5s[0].Points) != 1 { + t.Fatal("Expected 1 series with 1 point for 5s test") + } + + rate1s := out1s[0].Points[0].Value + rate5s := out5s[0].Points[0].Value + + if rate1s <= rate5s { + t.Errorf("Expected 1s rate (%f) to be higher than 5s rate (%f) due to normalization", rate1s, rate5s) + } + + expectedRate1s := 100.0 / 1.0 + expectedRate5s := 100.0 / 5.0 + + assert.Equal(t, expectedRate1s, rate1s) + assert.Equal(t, expectedRate5s, rate5s) +} + +func Test_NewTimeSeriesAggregatorSelection(t *testing.T) { + stepDuration := 5.0 + + tests := []struct { + name string + profileType string + expectedType string + description string + }{ + { + name: "CPU samples uses sum aggregation", + profileType: "samples", + expectedType: "sum", + description: "CPU samples are count of sampling events (instant)", + }, + { + name: "CPU time uses rate normalization", + profileType: "cpu", + expectedType: "rate", + description: "CPU time is cumulative - actual time consumed over duration", + }, + { + name: "Allocated objects uses rate normalization", + profileType: "alloc_objects", + expectedType: "rate", + description: "Cumulative allocation profiles need rate normalization", + }, + { + name: "Contentions use sum aggregation", + profileType: "contentions", + expectedType: "sum", + description: "Contention events are instant counts that should be summed", + }, + { + name: "Lock time uses rate normalization", + profileType: "lock_time", + expectedType: "rate", + description: "Lock time is cumulative duration that needs rate normalization", + }, + { + name: "In-use objects uses average aggregation", + profileType: "inuse_objects", + expectedType: "avg", + description: "Instant memory profiles should be averaged", + }, + { + name: "Goroutines use average aggregation", + profileType: "goroutine", + expectedType: "avg", + description: "Goroutine counts are instant values that should be averaged", + }, + { + name: "Unknown types default to rate normalization", + profileType: "unknown_profile_type", + expectedType: "rate", + description: "Unknown profile types default to cumulative for safety", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + labels := []*typesv1.LabelPair{ + {Name: "__type__", Value: tt.profileType}, + } + + aggregator := NewTimeSeriesAggregator(labels, stepDuration, nil) + actualType := getAggregatorType(aggregator) + + if actualType != tt.expectedType { + t.Errorf("Expected %s aggregator for %s, got %s. %s", + tt.expectedType, tt.profileType, actualType, tt.description) + } + }) + } +} + +func getAggregatorType(agg TimeSeriesAggregator) string { + switch agg.(type) { + case *sumTimeSeriesAggregator: + return "sum" + case *avgTimeSeriesAggregator: + return "avg" + case *rateTimeSeriesAggregator: + return "rate" + default: + return "unknown" + } +} diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 53e54a2855..38883ed0cb 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -143,6 +143,8 @@ func (q *QueryHandlers) Render(w http.ResponseWriter, req *http.Request) { aggregation = typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_SUM case "avg": aggregation = typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_AVERAGE + case "rate": + aggregation = typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE } } From 9a3ed0f779d4a6410e2db79946c1ffa9d25d124b Mon Sep 17 00:00:00 2001 From: Marc Sanmiquel Date: Thu, 19 Jun 2025 16:41:17 +0200 Subject: [PATCH 2/2] Address review suggestions --- .../query_backend/query_time_series.go | 20 +-- .../query_select_time_series.go | 5 +- pkg/model/profile_types.go | 163 ------------------ pkg/model/time_series.go | 31 +--- pkg/model/time_series_test.go | 107 ++++-------- pkg/querier/querier_test.go | 3 + 6 files changed, 55 insertions(+), 274 deletions(-) delete mode 100644 pkg/model/profile_types.go diff --git a/pkg/experiment/query_backend/query_time_series.go b/pkg/experiment/query_backend/query_time_series.go index d3ee6e74c9..800d0fd35a 100644 --- a/pkg/experiment/query_backend/query_time_series.go +++ b/pkg/experiment/query_backend/query_time_series.go @@ -3,13 +3,11 @@ package query_backend import ( "strings" "sync" - "time" "github.com/grafana/dskit/runutil" "github.com/parquet-go/parquet-go" queryv1 "github.com/grafana/pyroscope/api/gen/proto/go/query/v1" - typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" "github.com/grafana/pyroscope/pkg/experiment/block" phlaremodel "github.com/grafana/pyroscope/pkg/model" parquetquery "github.com/grafana/pyroscope/pkg/phlaredb/query" @@ -111,7 +109,7 @@ func newTimeSeriesAggregator(req *queryv1.InvokeRequest) aggregator { func (a *timeSeriesAggregator) aggregate(report *queryv1.Report) error { r := report.TimeSeries a.init.Do(func() { - a.series = phlaremodel.NewTimeSeriesMerger(true) + a.series = phlaremodel.NewTimeSeriesMerger(false) a.query = r.Query.CloneVT() }) a.series.MergeTimeSeries(r.TimeSeries) @@ -119,22 +117,10 @@ func (a *timeSeriesAggregator) aggregate(report *queryv1.Report) error { } func (a *timeSeriesAggregator) build() *queryv1.Report { - // TODO(kolesnikovae): Average aggregation should be implemented in - // the way that it can be distributed (count + sum), and should be done - // at "aggregate" call. - sum := typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_SUM - stepMilli := time.Duration(a.query.GetStep() * float64(time.Second)).Milliseconds() - seriesIterator := phlaremodel.NewTimeSeriesMergeIterator(a.series.TimeSeries()) return &queryv1.Report{ TimeSeries: &queryv1.TimeSeriesReport{ - Query: a.query, - TimeSeries: phlaremodel.RangeSeries( - seriesIterator, - a.startTime, - a.endTime, - stepMilli, - &sum, - ), + Query: a.query, + TimeSeries: a.series.TimeSeries(), }, } } diff --git a/pkg/frontend/read_path/query_frontend/query_select_time_series.go b/pkg/frontend/read_path/query_frontend/query_select_time_series.go index 233203e48b..2cfa8305da 100644 --- a/pkg/frontend/read_path/query_frontend/query_select_time_series.go +++ b/pkg/frontend/read_path/query_frontend/query_select_time_series.go @@ -65,6 +65,9 @@ func (q *QueryFrontend) SelectSeries( if report == nil { return connect.NewResponse(&querierv1.SelectSeriesResponse{}), nil } - series := phlaremodel.TopSeries(report.TimeSeries.TimeSeries, int(c.Msg.GetLimit())) + + it := phlaremodel.NewTimeSeriesMergeIterator(report.TimeSeries.TimeSeries) + aggregatedSeries := phlaremodel.RangeSeries(it, c.Msg.Start, c.Msg.End, stepMs, c.Msg.Aggregation) + series := phlaremodel.TopSeries(aggregatedSeries, int(c.Msg.GetLimit())) return connect.NewResponse(&querierv1.SelectSeriesResponse{Series: series}), nil } diff --git a/pkg/model/profile_types.go b/pkg/model/profile_types.go deleted file mode 100644 index 35d724cf08..0000000000 --- a/pkg/model/profile_types.go +++ /dev/null @@ -1,163 +0,0 @@ -package model - -import ( - typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" -) - -// ProfileTypeInfo contains aggregation and semantic information for profile types -type ProfileTypeInfo struct { - Name string - Description string - Group string - Unit string - IsCumulative bool - AggregationType typesv1.TimeSeriesAggregationType -} - -// ProfilesTypeRegistry contains all known profile types -var ProfilesTypeRegistry = map[string]ProfileTypeInfo{ - "samples": { - Name: "samples", - Description: "Number of sampling events collected", - Group: "cpu", - Unit: "short", - IsCumulative: false, - AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_SUM, - }, - "cpu": { - Name: "cpu", - Description: "CPU time consumed", - Group: "cpu", - Unit: "ns", - IsCumulative: true, - AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE, - }, - "inuse_objects": { - Name: "inuse_objects", - Description: "Number of objects currently in use", - Group: "memory", - Unit: "short", - IsCumulative: false, - AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_AVERAGE, - }, - "inuse_space": { - Name: "inuse_space", - Description: "Size of memory currently in use", - Group: "memory", - Unit: "bytes", - IsCumulative: false, - AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_AVERAGE, - }, - "alloc_objects": { - Name: "alloc_objects", - Description: "Number of objects allocated", - Group: "memory", - Unit: "short", - IsCumulative: true, - AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE, - }, - "alloc_space": { - Name: "alloc_space", - Description: "Size of memory allocated in the heap", - Group: "memory", - Unit: "bytes", - IsCumulative: true, - AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE, - }, - "alloc_samples": { - Name: "alloc_samples", - Description: "Number of memory allocation samples during CPU time", - Group: "memory", - Unit: "short", - IsCumulative: true, - AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE, - }, - "alloc_size": { - Name: "alloc_size", - Description: "Size of memory allocated during CPU time", - Group: "memory", - Unit: "bytes", - IsCumulative: true, - AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE, - }, - "alloc_in_new_tlab_bytes": { - Name: "alloc_in_new_tlab_bytes", - Description: "Size of memory allocated inside Thread-Local Allocation Buffers", - Group: "memory", - Unit: "bytes", - IsCumulative: true, - AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE, - }, - "alloc_in_new_tlab_objects": { - Name: "alloc_in_new_tlab_objects", - Description: "Number of objects allocated inside Thread-Local Allocation Buffers", - Group: "memory", - Unit: "short", - IsCumulative: true, - AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE, - }, - "goroutine": { - Name: "goroutine", - Description: "Number of goroutines", - Group: "goroutine", - Unit: "short", - IsCumulative: false, - AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_AVERAGE, - }, - "contentions": { - Name: "contentions", - Description: "Number of lock contentions observed", - Group: "locks", - Unit: "short", - IsCumulative: false, - AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_SUM, - }, - "lock_count": { - Name: "lock_count", - Description: "Number of lock acquisitions attempted", - Group: "locks", - Unit: "short", - IsCumulative: false, - AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_SUM, - }, - "delay": { - Name: "delay", - Description: "Time spent in blocking delays", - Group: "locks", - Unit: "ns", - IsCumulative: true, - AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE, - }, - "lock_time": { - Name: "lock_time", - Description: "Cumulative time spent acquiring locks", - Group: "locks", - Unit: "ns", - IsCumulative: true, - AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE, - }, - "exceptions": { - Name: "exceptions", - Description: "Number of exceptions within the sampled CPU time", - Group: "exceptions", - Unit: "short", - IsCumulative: false, - AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_SUM, - }, -} - -// GetProfileTypeInfo returns aggregation information for a profile type -func GetProfileTypeInfo(profileType string) ProfileTypeInfo { - if info, exists := ProfilesTypeRegistry[profileType]; exists { - return info - } - - return ProfileTypeInfo{ - Name: profileType, - Description: "Unknown profile type", - Group: "unknown", - Unit: "unknown", - IsCumulative: true, - AggregationType: typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE, - } -} diff --git a/pkg/model/time_series.go b/pkg/model/time_series.go index b26f2e4e0f..76e0ccbf97 100644 --- a/pkg/model/time_series.go +++ b/pkg/model/time_series.go @@ -69,19 +69,14 @@ type TimeSeriesAggregator interface { GetTimestamp() int64 } -func NewTimeSeriesAggregator(labels []*typesv1.LabelPair, stepDurationSec float64, aggregation *typesv1.TimeSeriesAggregationType) TimeSeriesAggregator { - profileType := getProfileTypeName(labels) - profileInfo := GetProfileTypeInfo(profileType) - - // If explicit aggregation type is provided, respect it for instant profiles only - if aggregation != nil && *aggregation == typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_AVERAGE { - // For instant profiles, use average aggregation - if !profileInfo.IsCumulative { - return &avgTimeSeriesAggregator{ts: -1} - } +func NewTimeSeriesAggregator(stepDurationSec float64, aggregation *typesv1.TimeSeriesAggregationType) TimeSeriesAggregator { + // Default to sum for backward compatibility + aggregationType := typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_SUM + if aggregation != nil { + aggregationType = *aggregation } - switch profileInfo.AggregationType { + switch aggregationType { case typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE: return &rateTimeSeriesAggregator{ts: -1, stepDurationSec: stepDurationSec} case typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_AVERAGE: @@ -89,18 +84,8 @@ func NewTimeSeriesAggregator(labels []*typesv1.LabelPair, stepDurationSec float6 case typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_SUM: return &sumTimeSeriesAggregator{ts: -1} default: - return &rateTimeSeriesAggregator{ts: -1, stepDurationSec: stepDurationSec} - } -} - -// getProfileTypeName extracts the profile type name from labels -func getProfileTypeName(labels []*typesv1.LabelPair) string { - for _, label := range labels { - if label.Name == "__type__" { - return label.Value - } + return &sumTimeSeriesAggregator{ts: -1} } - return "" } type sumTimeSeriesAggregator struct { @@ -219,7 +204,7 @@ Outer: point := it.At() aggregator, ok := aggregators[point.LabelsHash] if !ok { - aggregator = NewTimeSeriesAggregator(point.Lbs, stepDurationSec, aggregation) + aggregator = NewTimeSeriesAggregator(stepDurationSec, aggregation) aggregators[point.LabelsHash] = aggregator } if point.Ts > currentStep { diff --git a/pkg/model/time_series_test.go b/pkg/model/time_series_test.go index bd06018cf9..255e0424c7 100644 --- a/pkg/model/time_series_test.go +++ b/pkg/model/time_series_test.go @@ -163,37 +163,37 @@ func Test_RangeSeriesRateNormalization(t *testing.T) { cpuLabels := NewLabelsBuilder(nil).Set("__type__", "cpu").Labels() testData := []TimeSeriesValue{ - {Ts: 1000, Value: 50, Lbs: cpuLabels, LabelsHash: cpuLabels.Hash()}, - {Ts: 1000, Value: 50, Lbs: cpuLabels, LabelsHash: cpuLabels.Hash()}, + {Ts: 0, Value: 0, Lbs: cpuLabels, LabelsHash: cpuLabels.Hash()}, // t=0: start + {Ts: 15000, Value: 30, Lbs: cpuLabels, LabelsHash: cpuLabels.Hash()}, // t=15s: 30s CPU + {Ts: 30000, Value: 0, Lbs: cpuLabels, LabelsHash: cpuLabels.Hash()}, // t=30s: reset + {Ts: 45000, Value: 30, Lbs: cpuLabels, LabelsHash: cpuLabels.Hash()}, // t=45s: 30s CPU again } - // Test with 1 second steps (step = 1000ms) - in1s := iter.NewSliceIterator(testData) - out1s := RangeSeries(in1s, 1000, 1000, 1000, nil) + // Test with 15 second steps (step = 15000ms) - creates 4 bucket + in15s := iter.NewSliceIterator(testData) + rateAggregation := typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE + out15s := RangeSeries(in15s, 0, 60000, 15000, &rateAggregation) - // Test with 5 second steps (step = 5000ms) - in5s := iter.NewSliceIterator(testData) - out5s := RangeSeries(in5s, 1000, 1000, 5000, nil) + // Test with 30 second steps (step = 30000ms) - creates 2 buckets + in30s := iter.NewSliceIterator(testData) + out30s := RangeSeries(in30s, 0, 60000, 30000, &rateAggregation) - if len(out1s) != 1 || len(out1s[0].Points) != 1 { - t.Fatal("Expected 1 series with 1 point for 1s test") + var total15s, total30s float64 + for _, series := range out15s { + for _, point := range series.Points { + total15s += point.Value * 15.0 + } } - if len(out5s) != 1 || len(out5s[0].Points) != 1 { - t.Fatal("Expected 1 series with 1 point for 5s test") - } - - rate1s := out1s[0].Points[0].Value - rate5s := out5s[0].Points[0].Value - if rate1s <= rate5s { - t.Errorf("Expected 1s rate (%f) to be higher than 5s rate (%f) due to normalization", rate1s, rate5s) + for _, series := range out30s { + for _, point := range series.Points { + total30s += point.Value * 30.0 + } } - expectedRate1s := 100.0 / 1.0 - expectedRate5s := 100.0 / 5.0 - - assert.Equal(t, expectedRate1s, rate1s) - assert.Equal(t, expectedRate5s, rate5s) + assert.Equal(t, total15s, total30s) + assert.Equal(t, 60.0, total15s) + assert.Equal(t, 60.0, total30s) } func Test_NewTimeSeriesAggregatorSelection(t *testing.T) { @@ -201,78 +201,45 @@ func Test_NewTimeSeriesAggregatorSelection(t *testing.T) { tests := []struct { name string - profileType string + aggregation *typesv1.TimeSeriesAggregationType expectedType string description string }{ { - name: "CPU samples uses sum aggregation", - profileType: "samples", + name: "No aggregation defaults to sum for backward compatibility", + aggregation: nil, expectedType: "sum", - description: "CPU samples are count of sampling events (instant)", - }, - { - name: "CPU time uses rate normalization", - profileType: "cpu", - expectedType: "rate", - description: "CPU time is cumulative - actual time consumed over duration", - }, - { - name: "Allocated objects uses rate normalization", - profileType: "alloc_objects", - expectedType: "rate", - description: "Cumulative allocation profiles need rate normalization", }, { - name: "Contentions use sum aggregation", - profileType: "contentions", + name: "Explicit sum aggregation", + aggregation: &[]typesv1.TimeSeriesAggregationType{typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_SUM}[0], expectedType: "sum", - description: "Contention events are instant counts that should be summed", }, { - name: "Lock time uses rate normalization", - profileType: "lock_time", + name: "Explicit rate aggregation", + aggregation: &[]typesv1.TimeSeriesAggregationType{typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE}[0], expectedType: "rate", - description: "Lock time is cumulative duration that needs rate normalization", }, { - name: "In-use objects uses average aggregation", - profileType: "inuse_objects", + name: "Explicit average aggregation", + aggregation: &[]typesv1.TimeSeriesAggregationType{typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_AVERAGE}[0], expectedType: "avg", - description: "Instant memory profiles should be averaged", - }, - { - name: "Goroutines use average aggregation", - profileType: "goroutine", - expectedType: "avg", - description: "Goroutine counts are instant values that should be averaged", - }, - { - name: "Unknown types default to rate normalization", - profileType: "unknown_profile_type", - expectedType: "rate", - description: "Unknown profile types default to cumulative for safety", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - labels := []*typesv1.LabelPair{ - {Name: "__type__", Value: tt.profileType}, - } - - aggregator := NewTimeSeriesAggregator(labels, stepDuration, nil) - actualType := getAggregatorType(aggregator) - + aggregator := NewTimeSeriesAggregator(stepDuration, tt.aggregation) + actualType := getAggregatorType(t, aggregator) if actualType != tt.expectedType { - t.Errorf("Expected %s aggregator for %s, got %s. %s", - tt.expectedType, tt.profileType, actualType, tt.description) + t.Errorf("Expected %s aggregator, got %s.", tt.expectedType, actualType) } }) } } -func getAggregatorType(agg TimeSeriesAggregator) string { +func getAggregatorType(t *testing.T, agg TimeSeriesAggregator) string { + t.Helper() switch agg.(type) { case *sumTimeSeriesAggregator: return "sum" diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index dd6c66ef27..9aaa3a9992 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -509,12 +509,15 @@ func TestSelectSeries(t *testing.T) { {true, "WithBlockHints"}, } { t.Run(tc.name, func(t *testing.T) { + foobarlabels := phlaremodel.NewLabelsBuilder(nil).Set("foo", "bar").Labels() + req := connect.NewRequest(&querierv1.SelectSeriesRequest{ LabelSelector: `{app="foo"}`, ProfileTypeID: "memory:inuse_space:bytes:space:byte", Start: 0, End: 2, Step: 0.001, + // No aggregation specified - uses default (sum) for backward compatibility }) bidi1 := newFakeBidiClientSeries([]*ingestv1.ProfileSets{ {