Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions logicalplan/distribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,15 @@ func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options)
}
minEngineOverlap := labelRanges.minOverlap()

// Preprocess rewrite distributable averages as sum/count
var warns = annotations.New()
// TODO(fpetkovski): Consider changing TraverseBottomUp to pass in a list of parents in the transform function.
parents := make(map[*Node]*Node)
TraverseBottomUp(nil, &plan, func(parent, current *Node) (stop bool) {
parents[current] = parent
return false
})

// Preprocess rewrite distributable averages as sum/count
TraverseBottomUp(nil, &plan, func(parent, current *Node) (stop bool) {
if !(isDistributive(current, m.SkipBinaryPushdown, engineLabels, warns) || isAvgAggregation(current)) {
return true
Expand Down Expand Up @@ -213,12 +220,6 @@ func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options)
return !(isDistributive(parent, m.SkipBinaryPushdown, engineLabels, warns) || isAvgAggregation(parent))
})

// TODO(fpetkovski): Consider changing TraverseBottomUp to pass in a list of parents in the transform function.
parents := make(map[*Node]*Node)
TraverseBottomUp(nil, &plan, func(parent, current *Node) (stop bool) {
parents[current] = parent
return false
})
TraverseBottomUp(nil, &plan, func(parent, current *Node) (stop bool) {
// If the current operation is not distributive, stop the traversal.
if !isDistributive(current, m.SkipBinaryPushdown, engineLabels, warns) {
Expand Down Expand Up @@ -257,6 +258,22 @@ func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options)
*current = m.distributeQuery(current, engines, m.subqueryOpts(parents, current, opts), minEngineOverlap)
return true
})

// HACK: Postprocess rewrite all "Selectors" that we missed to remote queries as base case
stop := false
Traverse(&plan, func(current *Node) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why the optimizer won't catch this case.

if stop {
return
}
switch (*current).(type) {
case *RemoteExecution:
// No need to distribute below a remote execution
stop = true
case *VectorSelector, *MatrixSelector:
*current = m.distributeQuery(current, engines, m.subqueryOpts(parents, current, opts), minEngineOverlap)
}
})

return plan, *warns
}

Expand Down Expand Up @@ -532,7 +549,7 @@ func isDistributive(expr *Node, skipBinaryPushdown bool, engineLabels map[string

switch e := (*expr).(type) {
case Deduplicate, RemoteExecution:
return false
return true
case *Binary:
if isBinaryExpressionWithOneScalarSide(e) {
return true
Expand Down
50 changes: 47 additions & 3 deletions logicalplan/distribute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ package logicalplan

import (
"math"
"math/rand"
"regexp"
"testing"
"time"

"github.com/cortexproject/promqlsmith"
"github.com/thanos-io/promql-engine/api"
"github.com/thanos-io/promql-engine/query"

"github.com/efficientgo/core/testutil"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
)
Expand All @@ -32,6 +35,16 @@ func TestDistributedExecution(t *testing.T) {
expectWarn bool
expected string
}{
{
name: "this is a bug",
expr: `group by (region) (foo) * on (region) bar`,
expected: `
group by (region) (
dedup(
remote(group by (region) (foo)),
remote(group by (region) (foo)))
) * on (region) dedup(remote(bar), remote(bar))`,
},
{
name: "selector",
expr: `http_requests_total`,
Expand Down Expand Up @@ -568,7 +581,7 @@ dedup(
remote(max_over_time(sum_over_time(sum_over_time(metric[5m])[45m:10m])[15m:15m])) [1970-01-01 07:05:00 +0000 UTC, 1970-01-01 12:00:00 +0000 UTC])`,
},
{
name: "subquery with a total 4h range is cannot be distributed",
name: "subquery with a total 6h range is cannot be distributed",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm maybe we want to add a new test case?

firstEngineOpts: engineOpts{
minTime: queryStart,
maxTime: time.Unix(0, 0).Add(eightHours),
Expand All @@ -577,8 +590,8 @@ dedup(
minTime: time.Unix(0, 0).Add(sixHours),
maxTime: queryEnd,
},
expr: `sum_over_time(sum_over_time(metric[2h])[2h:30m])`,
expected: `sum_over_time(sum_over_time(metric[2h])[2h:30m])`,
expr: `sum_over_time(sum_over_time(metric[4h])[2h:30m])`,
expected: `sum_over_time(sum_over_time(metric[4h])[2h:30m])`,
},
{
name: "sum over 3h does not distribute the query due to insufficient engine overlap",
Expand Down Expand Up @@ -818,6 +831,37 @@ sum(dedup(
testutil.Assert(t, len(vs1.LabelMatchers) == len(vs0.LabelMatchers)-1, "expected %d label matchers, got %d", len(vs0.LabelMatchers)-1, len(vs1.LabelMatchers))
}

func FuzzDistributedEnginePlanNoUnnecessarySelectCalls(f *testing.F) {
engines := []api.RemoteEngine{
newEngineMock(math.MinInt64, math.MaxInt64, []labels.Labels{labels.FromStrings("region", "east")}),
newEngineMock(math.MinInt64, math.MaxInt64, []labels.Labels{labels.FromStrings("region", "west")}),
}
f.Add(int64(0))
f.Fuzz(func(t *testing.T, seed int64) {
rnd := rand.New(rand.NewSource(seed))
ps := promqlsmith.New(rnd, []labels.Labels{
labels.FromStrings(model.MetricNameLabel, "foo", "bar", "baz", "region", "east"),
labels.FromStrings(model.MetricNameLabel, "foo", "bar", "quz", "region", "west"),
})
expr := ps.WalkInstantQuery()
lplan, _ := NewFromAST(expr, &query.Options{}, PlanOptions{})

oplan, _ := lplan.Optimize([]Optimizer{
DistributedExecutionOptimizer{Endpoints: api.NewStaticEndpoints(engines)},
})
root := oplan.Root()

Traverse(&root, func(n *Node) {
switch (*n).(type) {
case *MatrixSelector:
t.Fatal("unexpected matrixselector in: ", renderExprTree(root), "from: ", expr.String())
case *VectorSelector:
t.Fatal("unexpected vectorselector in: ", renderExprTree(root), "from: ", expr.String())
}
})
})
}

type engineMock struct {
api.RemoteEngine
minT int64
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
go test fuzz v1
int64(26)
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
go test fuzz v1
int64(53)
Loading