Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions logicalplan/set_projection_labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package logicalplan

import (
"maps"
"slices"

Check failure on line 6 in logicalplan/set_projection_labels.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

File is not properly formatted (gci)
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/util/annotations"
"github.com/thanos-io/promql-engine/query"
)

// setProjectionLabels is an optimizer that sets the projection labels for all vector selectors.
// If a projection is already set as a matcher, it will be materialized in the selector
// and the matcher will be removed.
// This is useful for sending projections as part of a remote query.
type setProjectionLabels struct{}

const seriesIDLabel = "__series__id"

func (s setProjectionLabels) Optimize(expr Node, _ *query.Options) (Node, annotations.Annotations) {
var hasProjections bool
TraverseBottomUp(nil, &expr, func(_ *Node, current *Node) bool {
switch e := (*current).(type) {
case *VectorSelector:
// Check if a projection is already set in the node.
hasProjections = e.Projection.Include || len(e.Projection.Labels) > 0
}
return hasProjections
})
if hasProjections {
return expr, annotations.Annotations{}
}

var projection Projection
return s.optimize(expr, projection)
}

func (s setProjectionLabels) optimize(expr Node, projection Projection) (Node, annotations.Annotations) {
var stop bool
Traverse(&expr, func(current *Node) {
if stop {
return
}
switch e := (*current).(type) {
case *Aggregation:
switch e.Op {
case parser.TOPK, parser.BOTTOMK:
projection = Projection{}
default:
projection = Projection{
Labels: slices.Clone(e.Grouping),
Include: !e.Without,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I know how your scanner is implemented when it is a without? Does it return series hash label in this case?

https://github.com/thanos-io/promql-engine/pull/549/files#diff-2d378c97867270a9cf6883930a4a229b3928d637e4c5c558abf5f4bd913f2f8cR41

This is what I implemented as a mock storage which does projection and always return series hash when projection is available. So maybe there is some difference here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When it is without, it returns the hash over all labels, and the labels which are not in the Labels slice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to include series hash in the grouping label for without? So that it can be removed after the aggregation.

Or you have another place to remove series hash label?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll need to take a look at the details, will post a bit later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we dont need to include because the hash will be just an implementation detail of the storage. We use it for things like: partitioning, map series to iterators, etc.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We include the hash label only when Include is true. We also have an explicit select for the series hash label for some binary joins as shown in tests.

Copy link
Contributor

@yeya24 yeya24 Apr 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't have hash label when without then how do we pass the duplicate labels check in the engine? Do we have to disable it.

I am trying to run some correctness tests and saw some issues based on my setup. I assume something wrong in my mock storage implementation. Maybe you can try to add a similar correctness test using a mock storage and see if there is any correctness issue.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We disabled those duplicate checks, and afaik prometheus has a feature to do that now as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. But I can still see some correctness issues after disabling duplicate check so maybe you can take a closer look

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's disable them here too - with a hash column we won't get issues though since we never drop it i think

}
return
case *FunctionCall:
switch e.Func.Name {
case "absent_over_time", "absent", "scalar":
projection = Projection{Include: true}
case "label_replace":
switch projection.Include {
case true:
if slices.Contains(projection.Labels, UnsafeUnwrapString(e.Args[1])) {
projection.Labels = append(projection.Labels, UnsafeUnwrapString(e.Args[3]))
}
case false:
if !slices.Contains(projection.Labels, UnsafeUnwrapString(e.Args[1])) {
projection.Labels = slices.DeleteFunc(projection.Labels, func(s string) bool {
return s == UnsafeUnwrapString(e.Args[3])
})
}
}
}
case *Binary:
var highCard, lowCard = e.LHS, e.RHS
if e.VectorMatching == nil || (!e.VectorMatching.On && len(e.VectorMatching.MatchingLabels) == 0) {
if IsConstantExpr(lowCard) {
s.optimize(highCard, projection)
} else {
s.optimize(highCard, Projection{})
}

if IsConstantExpr(highCard) {
s.optimize(lowCard, projection)
} else {
s.optimize(lowCard, Projection{})
}
stop = true
return
}
if e.VectorMatching.Card == parser.CardOneToMany {
highCard, lowCard = lowCard, highCard
}

hcProjection := extendProjection(projection, e.VectorMatching.MatchingLabels)
s.optimize(highCard, hcProjection)
lcProjection := extendProjection(Projection{
Include: e.VectorMatching.On,
Labels: append([]string{seriesIDLabel}, e.VectorMatching.MatchingLabels...),
}, e.VectorMatching.Include)
s.optimize(lowCard, lcProjection)
stop = true
case *VectorSelector:
slices.Sort(projection.Labels)
projection.Labels = slices.Compact(projection.Labels)
e.Projection = projection
projection = Projection{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you also need to change MergeSelect or just modify the scanner to get it work with Projection? Like the selects you merged might have different projection labels and you need to fetch both and project only required labels for each select

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this is a good question, we do not use MergeSelect anymore, but I think we should modify it to be a union of all selectors that it merges.

}
})

return expr, annotations.Annotations{}
}

func extendProjection(projection Projection, lbls []string) Projection {
var extendedLabels []string
if projection.Include {
extendedLabels = union(projection.Labels, lbls)
} else {
extendedLabels = intersect(projection.Labels, lbls)
}
return Projection{
Include: projection.Include,
Labels: extendedLabels,
}
}

// union returns the union of two string slices.
func union(l1 []string, l2 []string) []string {
m := make(map[string]struct{})
for _, s := range l1 {
m[s] = struct{}{}
}
for _, s := range l2 {
m[s] = struct{}{}
}
return slices.Collect(maps.Keys(m))
}

// intersect returns the intersection of two string slices.
func intersect(l1 []string, l2 []string) []string {
m := make(map[string]struct{})
var result []string
for _, s := range l1 {
m[s] = struct{}{}
}
for _, s := range l2 {
if _, ok := m[s]; ok {
result = append(result, s)
}
}
return result
}
236 changes: 236 additions & 0 deletions logicalplan/set_projection_labels_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package logicalplan

import (
"fmt"
"strings"
"testing"

Check failure on line 7 in logicalplan/set_projection_labels_test.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

File is not properly formatted (gci)
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/util/annotations"
"github.com/stretchr/testify/require"
"github.com/thanos-io/promql-engine/query"
)

var reSpaces = strings.NewReplacer("\n", "", "\t", "")

func TestOptimizeSetProjectionLabels(t *testing.T) {
cases := []struct {
name string
expr string
expected string
}{
{
name: "simple vector selector",
expr: `metric_a{job="api-server"}`,
expected: `metric_a{job="api-server"}[exclude()]`,
},
{
name: "top-level label_replace",
expr: `label_replace(kube_node_info{node="gke-1"}, "instance", "$1", "node", "(.*)")`,
expected: `label_replace(kube_node_info{node="gke-1"}[exclude()], "instance", "$1", "node", "(.*)")`,
},
{
name: "sum by all labels",
expr: `sum(label_replace(kube_node_info{node="gke-1"}, "instance", "$1", "node", "(.*)"))`,
expected: `sum(label_replace(kube_node_info{node="gke-1"}[project()], "instance", "$1", "node", "(.*)"))`,
},
{
name: "sum by target label",
expr: `sum by (instance) (label_replace(kube_node_info{node="gke-1"}, "instance", "$1", "node", "(.*)"))`,
expected: `sum by (instance) (label_replace(kube_node_info{node="gke-1"}[project(instance, node)], "instance", "$1", "node", "(.*)"))`,
},
{
name: "sum not including target label",
expr: `sum by (node, region) (label_replace(kube_node_info{node="gke-1"}, "instance", "$1", "node", "(.*)"))`,
expected: `sum by (node, region) (label_replace(kube_node_info{node="gke-1"}[project(node, region)], "instance", "$1", "node", "(.*)"))`,
},
{
name: "sum by source and target label",
expr: `sum by (node, instance, region) (label_replace(kube_node_info{node="gke-1"}, "instance", "$1", "node", "(.*)"))`,
expected: `sum by (node, instance, region) (label_replace(kube_node_info{node="gke-1"}[project(instance, node, region)], "instance", "$1", "node", "(.*)"))`,
},
{
name: "multiple label replace calls",
expr: `
sum by (instance, node, region) (
label_replace(
label_replace(kube_node_info{node="gke-1"}, "ip-addr", "$1", "ip", "(.*)"),
"instance", "$1", "node", "(.*)"
)
)`,
expected: `sum by (instance, node, region) (label_replace(label_replace(kube_node_info{node="gke-1"}[project(instance, node, region)], "ip-addr", "$1", "ip", "(.*)"), "instance", "$1", "node", "(.*)"))`,
},
{
name: "sum without",
expr: `sum without (xyz) (label_replace(kube_node_info{node="gke-1"}, "instance", "$1", "node", "(.*)"))`,
expected: `sum without (xyz) (label_replace(kube_node_info{node="gke-1"}[exclude(xyz)], "instance", "$1", "node", "(.*)"))`,
},
{
name: "absent",
expr: `absent(kube_node_info{node="gke-1"})`,
expected: `absent(kube_node_info{node="gke-1"}[project()])`,
},
{
name: "aggregation with grouping",
expr: `sum by (pod) (kube_node_info{node="gke-1"})`,
expected: `sum by (pod) (kube_node_info{node="gke-1"}[project(pod)])`,
},
{
name: "double aggregation with grouping",
expr: `max by (pod) (sum by (pod, target) (kube_node_info{node="gke-1"}))`,
expected: `max by (pod) (sum by (pod, target) (kube_node_info{node="gke-1"}[project(pod, target)]))`,
},
{
name: "double aggregation with by and without grouping",
expr: `max by (pod) (sum without (pod, target) (kube_node_info{node="gke-1"}))`,
expected: `max by (pod) (sum without (pod, target) (kube_node_info{node="gke-1"}[exclude(pod, target)]))`,
},
{
name: "double aggregation with by and without grouping",
expr: `max by (pod) (sum without (target) (kube_node_info{node="gke-1"}))`,
expected: `max by (pod) (sum without (target) (kube_node_info{node="gke-1"}[exclude(target)]))`,
},
{
name: "aggregation without grouping",
expr: `sum without (pod) (kube_node_info{node="gke-1"})`,
expected: `sum without (pod) (kube_node_info{node="gke-1"}[exclude(pod)])`,
},
{
name: "aggregation with binary expression",
expr: `sum without (pod) (metric_a * on (node) metric_b)`,
expected: `sum without (pod) (metric_a[exclude()] * on (node) metric_b[project(__series__id, node)])`,
},
{
name: "binary expression with vector and constant",
expr: `sum(metric_a * 3)`,
expected: `sum(metric_a[project()] * 3)`,
},
{
name: "binary expression with aggregation and constant",
expr: `sum(metric_a) * 3`,
expected: `sum(metric_a[project()]) * 3`,
},
{
name: "binary expression with one to one matching",
expr: `metric_a - metric_b`,
expected: `metric_a[exclude()] - metric_b[exclude()]`,
},
{
name: "binary expression with one to one matching on label",
expr: `metric_a - on (node) metric_b`,
expected: `metric_a[exclude()] - on (node) metric_b[project(__series__id, node)]`,
},
{
name: "binary expression with one to one matching on label group_left",
expr: `metric_a - on (node) group_left (cluster) metric_b`,
expected: `metric_a[exclude()] - on (node) group_left (cluster) metric_b[project(__series__id, cluster, node)]`,
},
{
name: "binary expression with one to one matching on label group_right",
expr: `metric_a - on (node) group_right (cluster) metric_b`,
expected: `metric_a[project(__series__id, cluster, node)] - on (node) group_right (cluster) metric_b[exclude()]`,
},
{
name: "aggregation with binary expression and one to one matching",
expr: `max by (k8s_cluster) (metric_a * up)`,
expected: `max by (k8s_cluster) (metric_a[exclude()] * up[exclude()])`,
},
{
name: "aggregation with binary expression with one to one matching on one label",
expr: `max by (k8s_cluster) (metric_a * on(node) up)`,
expected: `max by (k8s_cluster) (metric_a[project(k8s_cluster, node)] * on (node) up[project(__series__id, node)])`,
},
{
name: "aggregation with binary expression with matching one label group_left",
expr: `max by (k8s_cluster) (metric_a * on(node) group_left(hostname) up)`,
expected: `max by (k8s_cluster) (metric_a[project(k8s_cluster, node)] * on (node) group_left (hostname) up[project(__series__id, hostname, node)])`,
},
{
name: "aggregation with binary expression with matching one label group_right",
expr: `max by (k8s_cluster) (metric_a * on(node) group_right(hostname) up)`,
expected: `max by (k8s_cluster) (metric_a[project(__series__id, hostname, node)] * on (node) group_right (hostname) up[project(k8s_cluster, node)])`,
},
{
name: "binary expression with aggregation and label replace",
expr: `
topk(5,
sum by (k8s_cluster) (
max(metric_a) by (node)
* on(node) group_right(kubernetes_io_hostname) label_replace(label_replace(label_replace(up, "node", "$1", "kubernetes_io_hostname", "(.*)"),"node_role", "$1", "role", "(.*)"), "region", "$1", "topology_kubernetes_io_region", "(.*)")
* on(k8s_cluster) group_left(project) label_replace(k8s_cluster_info, "k8s_cluster", "$0", "cluster", ".*")
)
)`,
expected: `
topk(5,
sum by (k8s_cluster) (
max by (node) (metric_a[project(node)])
* on (node) group_right (kubernetes_io_hostname) label_replace(label_replace(label_replace(up[project(k8s_cluster, kubernetes_io_hostname, node)], "node", "$1", "kubernetes_io_hostname", "(.*)"), "node_role", "$1", "role", "(.*)"), "region", "$1", "topology_kubernetes_io_region", "(.*)")
* on (k8s_cluster) group_left (project) label_replace(k8s_cluster_info[project(__series__id, cluster, k8s_cluster, project)], "k8s_cluster", "$0", "cluster", ".*")))`,
},
{
name: "binary expression with aggregation and label replace",
expr: `
count by (cluster) (
label_replace(up, "region", "$0", "region", ".*")
* on(cluster, region) group_left(project) label_replace(max by(project, region, cluster)(k8s_cluster_info), "k8s_cluster", "$0", "cluster", ".*")
)`,
expected: `
count by (cluster) (
label_replace(up[project(cluster, region)], "region", "$0", "region", ".*")
* on (cluster, region) group_left (project) label_replace(max by (project, region, cluster) (
k8s_cluster_info[project(cluster, project, region)]), "k8s_cluster", "$0", "cluster", ".*"))`,
},
}

for _, c := range cases {
t.Run(c.expr, func(t *testing.T) {
expr, err := parser.ParseExpr(c.expr)
require.NoError(t, err)
plan := NewFromAST(expr, &query.Options{}, PlanOptions{})
optimized, annos := plan.Optimize(
[]Optimizer{
setProjectionLabels{},
// This is a dummy optimizer that replaces VectorSelectors with a custom struct
// which has a custom String() method.
swapVectorSelectors{},
})

require.Equal(t, annotations.Annotations{}, annos)
require.Equal(t, reSpaces.Replace(c.expected), reSpaces.Replace(optimized.Root().String()))
})
}
}

type swapVectorSelectors struct{}

func (s swapVectorSelectors) Optimize(plan Node, _ *query.Options) (Node, annotations.Annotations) {
TraverseBottomUp(nil, &plan, func(_, expr *Node) bool {
switch v := (*expr).(type) {
case *VectorSelector:
*expr = newVectorOutput(v)
return true
}
return false
})
return plan, annotations.Annotations{}
}

type vectorOutput struct {
*VectorSelector
}

func newVectorOutput(vectorSelector *VectorSelector) *vectorOutput {
return &vectorOutput{
VectorSelector: vectorSelector,
}
}

func (vs vectorOutput) String() string {
var projectionType string
if vs.Projection.Include {
projectionType = "project"
} else {
projectionType = "exclude"
}
return fmt.Sprintf("%s[%s(%s)]", vs.VectorSelector.String(), projectionType, strings.Join(vs.Projection.Labels, ", "))
}
Loading