Skip to content

fix: time series aggregation inconsistency across different time windows #4255

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 23 additions & 18 deletions api/gen/proto/go/types/v1/types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion api/openapiv2/gen/phlare.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -2387,7 +2387,8 @@
"type": "string",
"enum": [
"TIME_SERIES_AGGREGATION_TYPE_SUM",
"TIME_SERIES_AGGREGATION_TYPE_AVERAGE"
"TIME_SERIES_AGGREGATION_TYPE_AVERAGE",
"TIME_SERIES_AGGREGATION_TYPE_RATE"
],
"default": "TIME_SERIES_AGGREGATION_TYPE_SUM"
},
Expand Down
1 change: 1 addition & 0 deletions api/types/v1/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ message BlockCompaction {
enum TimeSeriesAggregationType {
TIME_SERIES_AGGREGATION_TYPE_SUM = 0;
TIME_SERIES_AGGREGATION_TYPE_AVERAGE = 1;
TIME_SERIES_AGGREGATION_TYPE_RATE = 2;
}

// StackTraceSelector is used for filtering stack traces by locations.
Expand Down
20 changes: 3 additions & 17 deletions pkg/experiment/query_backend/query_time_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ package query_backend
import (
"strings"
"sync"
"time"

"github.com/grafana/dskit/runutil"
"github.com/parquet-go/parquet-go"

queryv1 "github.com/grafana/pyroscope/api/gen/proto/go/query/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
"github.com/grafana/pyroscope/pkg/experiment/block"
phlaremodel "github.com/grafana/pyroscope/pkg/model"
parquetquery "github.com/grafana/pyroscope/pkg/phlaredb/query"
Expand Down Expand Up @@ -111,30 +109,18 @@ func newTimeSeriesAggregator(req *queryv1.InvokeRequest) aggregator {
func (a *timeSeriesAggregator) aggregate(report *queryv1.Report) error {
r := report.TimeSeries
a.init.Do(func() {
a.series = phlaremodel.NewTimeSeriesMerger(true)
a.series = phlaremodel.NewTimeSeriesMerger(false)
a.query = r.Query.CloneVT()
})
a.series.MergeTimeSeries(r.TimeSeries)
return nil
}

func (a *timeSeriesAggregator) build() *queryv1.Report {
// TODO(kolesnikovae): Average aggregation should be implemented in
// the way that it can be distributed (count + sum), and should be done
// at "aggregate" call.
sum := typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_SUM
stepMilli := time.Duration(a.query.GetStep() * float64(time.Second)).Milliseconds()
seriesIterator := phlaremodel.NewTimeSeriesMergeIterator(a.series.TimeSeries())
return &queryv1.Report{
TimeSeries: &queryv1.TimeSeriesReport{
Query: a.query,
TimeSeries: phlaremodel.RangeSeries(
seriesIterator,
a.startTime,
a.endTime,
stepMilli,
&sum,
),
Query: a.query,
TimeSeries: a.series.TimeSeries(),
},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ func (q *QueryFrontend) SelectSeries(
if report == nil {
return connect.NewResponse(&querierv1.SelectSeriesResponse{}), nil
}
series := phlaremodel.TopSeries(report.TimeSeries.TimeSeries, int(c.Msg.GetLimit()))

it := phlaremodel.NewTimeSeriesMergeIterator(report.TimeSeries.TimeSeries)
aggregatedSeries := phlaremodel.RangeSeries(it, c.Msg.Start, c.Msg.End, stepMs, c.Msg.Aggregation)
series := phlaremodel.TopSeries(aggregatedSeries, int(c.Msg.GetLimit()))
return connect.NewResponse(&querierv1.SelectSeriesResponse{Series: series}), nil
}
55 changes: 49 additions & 6 deletions pkg/model/time_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,23 @@ type TimeSeriesAggregator interface {
GetTimestamp() int64
}

func NewTimeSeriesAggregator(aggregation *typesv1.TimeSeriesAggregationType) TimeSeriesAggregator {
if aggregation == nil {
return &sumTimeSeriesAggregator{ts: -1}
func NewTimeSeriesAggregator(stepDurationSec float64, aggregation *typesv1.TimeSeriesAggregationType) TimeSeriesAggregator {
// Default to sum for backward compatibility
aggregationType := typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_SUM
if aggregation != nil {
aggregationType = *aggregation
}
if *aggregation == typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_AVERAGE {

switch aggregationType {
case typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE:
return &rateTimeSeriesAggregator{ts: -1, stepDurationSec: stepDurationSec}
case typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_AVERAGE:
return &avgTimeSeriesAggregator{ts: -1}
case typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_SUM:
return &sumTimeSeriesAggregator{ts: -1}
default:
return &sumTimeSeriesAggregator{ts: -1}
}
return &sumTimeSeriesAggregator{ts: -1}
}

type sumTimeSeriesAggregator struct {
Expand Down Expand Up @@ -142,13 +151,47 @@ func (a *avgTimeSeriesAggregator) GetAndReset() *typesv1.Point {
func (a *avgTimeSeriesAggregator) IsEmpty() bool { return a.ts == -1 }
func (a *avgTimeSeriesAggregator) GetTimestamp() int64 { return a.ts }

// rateTimeSeriesAggregator normalizes cumulative profile values by step duration to produce rates
type rateTimeSeriesAggregator struct {
ts int64
sum float64
stepDurationSec float64
annotations []*typesv1.ProfileAnnotation
}

func (a *rateTimeSeriesAggregator) Add(ts int64, point *TimeSeriesValue) {
a.ts = ts
a.sum += point.Value
a.annotations = append(a.annotations, point.Annotations...)
}

func (a *rateTimeSeriesAggregator) GetAndReset() *typesv1.Point {
tsCopy := a.ts
// Normalize cumulative values by step duration to produce rates
normalizedValue := a.sum / a.stepDurationSec
annotationsCopy := make([]*typesv1.ProfileAnnotation, len(a.annotations))
copy(annotationsCopy, a.annotations)
a.ts = -1
a.sum = 0
a.annotations = a.annotations[:0]
return &typesv1.Point{
Timestamp: tsCopy,
Value: normalizedValue,
Annotations: annotationsCopy,
}
}

func (a *rateTimeSeriesAggregator) IsEmpty() bool { return a.ts == -1 }
func (a *rateTimeSeriesAggregator) GetTimestamp() int64 { return a.ts }
Comment on lines +154 to +185
Copy link
Collaborator

@kolesnikovae kolesnikovae Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not very sure this is what users expect from rate and normalization – in the test I see that reported rate changes depending on the step duration, which does not address the problem:

Time series visualization showed inconsistent values when querying the same workload across different time ranges

By definition, rate measures how a metric is changing over time (ΔV/Δt, and the Δt is fixed to 1s in our case): https://sourcegraph.com/github.com/prometheus/prometheus/-/blob/promql/functions.go?L71-73 is a good example of how we can calculate this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not too sure if we can do this much better, as we don't reliable have the duration (DurationNanos not set for every counter profile) of the profile and we also don't take it into account when selecting the time window.

I think fixing both of that is a massive change, that I don't think I want this tasl to turn into.

Let's say we have, we have 3 pods, two of them are using a scrape interval of 30s and one is using 15s. All 3 are using exactly one core all the time.

With normalising it to the step size, the result will be with step size 15s roughly this:

Pod    | 1 | 2 | 3 |
t+0s   | 0 | 2 | 1 | 
t+15s  | 2 | 0 | 1 |
t+30s  | 0 | 2 | 1 |
t+45s  | 2 | 0 | 1 |

total 3*4*15 = 180

And with step size 30s

Pod    | 1 | 2 | 3 |
t+0s   | 1 | 1 | 1 |
t+30s  | 1 | 1 | 1 |

total 3 * 2 * 30 = 180

I do think this looks very close to the correct data, it obviously is incorrect if you step size gets smaller than your collection window, that also the problem that prometheus has.

Copy link
Collaborator

@kolesnikovae kolesnikovae Jun 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I often use this approach to estimate average core usage – for example, if you get an aggregate profile worth 10 hours of CPU time and the query time range is 1 hour, that's essentially 10h/1h = 10 cores on average. The "total" itself isn't the issue. Problems arise when we have gaps in the data – which is common with ingest sampling. Some users also implement their own strategies, like the classic "10s every 60s" sampling.

I'm also curious how this works with very narrow query time ranges, like when a user tries to fetch an individual profile. I might be mistaken, but IIRC we can receive a step as small as 1ms or so.

This approach works in many cases and could already improve the situation. I'd say we should try it out and see – it's likely we'll need to go through a few iterations. However, as you mentioned, the proper solution lies much closer to the data itself, and can't really be implemented without DurationNanos, which is a must for delta profiles (otherwise, we're stuck calculating deltas ourselves).

I also find it interesting that many continuous profiling solutions don't even have a timeline. In many cases, it doesn't make much sense, it's hard to get right, and even harder to build correctly if sampling takes place (and it should take place).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fully agree with this we need to deploy this and find out 🙂

Btw: The minimum step size is configured in Grafana data source settings and will 15s by default: https://grafana.com/docs/grafana/latest/datasources/pyroscope/configure-pyroscope-data-source/#querying.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense. Let's test this out to see how it works and fix this separately #4192. Before that deploying this to dev, I want to address the other comment and remove from this PR the ProfileTypesRegistry in the backend for now.


// RangeSeries aggregates profiles into series.
// Series contains points spaced by step from start to end.
// Profiles from the same step are aggregated into one point.
func RangeSeries(it iter.Iterator[TimeSeriesValue], start, end, step int64, aggregation *typesv1.TimeSeriesAggregationType) []*typesv1.Series {
defer it.Close()
seriesMap := make(map[uint64]*typesv1.Series)
aggregators := make(map[uint64]TimeSeriesAggregator)
stepDurationSec := float64(step) / 1000.0

if !it.Next() {
return nil
Expand All @@ -161,7 +204,7 @@ Outer:
point := it.At()
aggregator, ok := aggregators[point.LabelsHash]
if !ok {
aggregator = NewTimeSeriesAggregator(aggregation)
aggregator = NewTimeSeriesAggregator(stepDurationSec, aggregation)
aggregators[point.LabelsHash] = aggregator
}
if point.Ts > currentStep {
Expand Down
129 changes: 113 additions & 16 deletions pkg/model/time_series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
"github.com/grafana/pyroscope/pkg/iter"
"github.com/grafana/pyroscope/pkg/testhelper"

"github.com/stretchr/testify/assert"
)

func Test_RangeSeriesSum(t *testing.T) {
seriesA := NewLabelsBuilder(nil).Set("foo", "bar").Labels()
seriesB := NewLabelsBuilder(nil).Set("foo", "buzz").Labels()
seriesA := NewLabelsBuilder(nil).Set("foo", "bar").Set("__type__", "contentions").Labels()
seriesB := NewLabelsBuilder(nil).Set("foo", "buzz").Set("__type__", "contentions").Labels()
for _, tc := range []struct {
name string
in []TimeSeriesValue
Expand All @@ -19,15 +21,16 @@ func Test_RangeSeriesSum(t *testing.T) {
{
name: "single series",
in: []TimeSeriesValue{
{Ts: 1, Value: 1},
{Ts: 1, Value: 1},
{Ts: 2, Value: 2},
{Ts: 3, Value: 3},
{Ts: 4, Value: 4},
{Ts: 5, Value: 5, Annotations: []*typesv1.ProfileAnnotation{{Key: "foo", Value: "bar"}}},
{Ts: 1, Value: 1, Lbs: seriesA, LabelsHash: seriesA.Hash()},
{Ts: 1, Value: 1, Lbs: seriesA, LabelsHash: seriesA.Hash()},
{Ts: 2, Value: 2, Lbs: seriesA, LabelsHash: seriesA.Hash()},
{Ts: 3, Value: 3, Lbs: seriesA, LabelsHash: seriesA.Hash()},
{Ts: 4, Value: 4, Lbs: seriesA, LabelsHash: seriesA.Hash()},
{Ts: 5, Value: 5, Lbs: seriesA, LabelsHash: seriesA.Hash(), Annotations: []*typesv1.ProfileAnnotation{{Key: "foo", Value: "bar"}}},
},
out: []*typesv1.Series{
{
Labels: seriesA,
Points: []*typesv1.Point{
{Timestamp: 1, Value: 2, Annotations: []*typesv1.ProfileAnnotation{}},
{Timestamp: 2, Value: 2, Annotations: []*typesv1.ProfileAnnotation{}},
Expand Down Expand Up @@ -83,8 +86,8 @@ func Test_RangeSeriesSum(t *testing.T) {
}

func Test_RangeSeriesAvg(t *testing.T) {
seriesA := NewLabelsBuilder(nil).Set("foo", "bar").Labels()
seriesB := NewLabelsBuilder(nil).Set("foo", "buzz").Labels()
seriesA := NewLabelsBuilder(nil).Set("foo", "bar").Set("__type__", "inuse_objects").Labels()
seriesB := NewLabelsBuilder(nil).Set("foo", "buzz").Set("__type__", "inuse_objects").Labels()
for _, tc := range []struct {
name string
in []TimeSeriesValue
Expand All @@ -93,15 +96,16 @@ func Test_RangeSeriesAvg(t *testing.T) {
{
name: "single series",
in: []TimeSeriesValue{
{Ts: 1, Value: 1},
{Ts: 1, Value: 2},
{Ts: 2, Value: 2},
{Ts: 2, Value: 3},
{Ts: 3, Value: 4},
{Ts: 4, Value: 5, Annotations: []*typesv1.ProfileAnnotation{{Key: "foo", Value: "bar"}}},
{Ts: 1, Value: 1, Lbs: seriesA, LabelsHash: seriesA.Hash()},
{Ts: 1, Value: 2, Lbs: seriesA, LabelsHash: seriesA.Hash()},
{Ts: 2, Value: 2, Lbs: seriesA, LabelsHash: seriesA.Hash()},
{Ts: 2, Value: 3, Lbs: seriesA, LabelsHash: seriesA.Hash()},
{Ts: 3, Value: 4, Lbs: seriesA, LabelsHash: seriesA.Hash()},
{Ts: 4, Value: 5, Lbs: seriesA, LabelsHash: seriesA.Hash(), Annotations: []*typesv1.ProfileAnnotation{{Key: "foo", Value: "bar"}}},
},
out: []*typesv1.Series{
{
Labels: seriesA,
Points: []*typesv1.Point{
{Timestamp: 1, Value: 1.5, Annotations: []*typesv1.ProfileAnnotation{}}, // avg of 1 and 2
{Timestamp: 2, Value: 2.5, Annotations: []*typesv1.ProfileAnnotation{}}, // avg of 2 and 3
Expand Down Expand Up @@ -154,3 +158,96 @@ func Test_RangeSeriesAvg(t *testing.T) {
})
}
}

func Test_RangeSeriesRateNormalization(t *testing.T) {
cpuLabels := NewLabelsBuilder(nil).Set("__type__", "cpu").Labels()

testData := []TimeSeriesValue{
{Ts: 0, Value: 0, Lbs: cpuLabels, LabelsHash: cpuLabels.Hash()}, // t=0: start
{Ts: 15000, Value: 30, Lbs: cpuLabels, LabelsHash: cpuLabels.Hash()}, // t=15s: 30s CPU
{Ts: 30000, Value: 0, Lbs: cpuLabels, LabelsHash: cpuLabels.Hash()}, // t=30s: reset
{Ts: 45000, Value: 30, Lbs: cpuLabels, LabelsHash: cpuLabels.Hash()}, // t=45s: 30s CPU again
}

// Test with 15 second steps (step = 15000ms) - creates 4 bucket
in15s := iter.NewSliceIterator(testData)
rateAggregation := typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE
out15s := RangeSeries(in15s, 0, 60000, 15000, &rateAggregation)

// Test with 30 second steps (step = 30000ms) - creates 2 buckets
in30s := iter.NewSliceIterator(testData)
out30s := RangeSeries(in30s, 0, 60000, 30000, &rateAggregation)

var total15s, total30s float64
for _, series := range out15s {
for _, point := range series.Points {
total15s += point.Value * 15.0
}
}

for _, series := range out30s {
for _, point := range series.Points {
total30s += point.Value * 30.0
}
}

assert.Equal(t, total15s, total30s)
assert.Equal(t, 60.0, total15s)
assert.Equal(t, 60.0, total30s)
}

func Test_NewTimeSeriesAggregatorSelection(t *testing.T) {
stepDuration := 5.0

tests := []struct {
name string
aggregation *typesv1.TimeSeriesAggregationType
expectedType string
description string
}{
{
name: "No aggregation defaults to sum for backward compatibility",
aggregation: nil,
expectedType: "sum",
},
{
name: "Explicit sum aggregation",
aggregation: &[]typesv1.TimeSeriesAggregationType{typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_SUM}[0],
expectedType: "sum",
},
{
name: "Explicit rate aggregation",
aggregation: &[]typesv1.TimeSeriesAggregationType{typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_RATE}[0],
expectedType: "rate",
},
{
name: "Explicit average aggregation",
aggregation: &[]typesv1.TimeSeriesAggregationType{typesv1.TimeSeriesAggregationType_TIME_SERIES_AGGREGATION_TYPE_AVERAGE}[0],
expectedType: "avg",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
aggregator := NewTimeSeriesAggregator(stepDuration, tt.aggregation)
actualType := getAggregatorType(t, aggregator)
if actualType != tt.expectedType {
t.Errorf("Expected %s aggregator, got %s.", tt.expectedType, actualType)
}
})
}
}

func getAggregatorType(t *testing.T, agg TimeSeriesAggregator) string {
t.Helper()
switch agg.(type) {
case *sumTimeSeriesAggregator:
return "sum"
case *avgTimeSeriesAggregator:
return "avg"
case *rateTimeSeriesAggregator:
return "rate"
default:
return "unknown"
}
}
Loading
Loading