-
Notifications
You must be signed in to change notification settings - Fork 70
Add optimizer for extracting projection labels #550
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,152 @@ | ||
| package logicalplan | ||
|
|
||
| import ( | ||
| "maps" | ||
| "slices" | ||
|
|
||
| "github.com/prometheus/prometheus/promql/parser" | ||
| "github.com/prometheus/prometheus/util/annotations" | ||
| "github.com/thanos-io/promql-engine/query" | ||
| ) | ||
|
|
||
| // setProjectionLabels is an optimizer that sets the projection labels for all vector selectors. | ||
| // If a projection is already set as a matcher, it will be materialized in the selector | ||
| // and the matcher will be removed. | ||
| // This is useful for sending projections as part of a remote query. | ||
| type setProjectionLabels struct{} | ||
|
|
||
| const seriesIDLabel = "__series__id" | ||
|
|
||
| func (s setProjectionLabels) Optimize(expr Node, _ *query.Options) (Node, annotations.Annotations) { | ||
| var hasProjections bool | ||
| TraverseBottomUp(nil, &expr, func(_ *Node, current *Node) bool { | ||
| switch e := (*current).(type) { | ||
| case *VectorSelector: | ||
| // Check if a projection is already set in the node. | ||
| hasProjections = e.Projection.Include || len(e.Projection.Labels) > 0 | ||
| } | ||
| return hasProjections | ||
| }) | ||
| if hasProjections { | ||
| return expr, annotations.Annotations{} | ||
| } | ||
|
|
||
| var projection Projection | ||
| return s.optimize(expr, projection) | ||
| } | ||
|
|
||
| func (s setProjectionLabels) optimize(expr Node, projection Projection) (Node, annotations.Annotations) { | ||
| var stop bool | ||
| Traverse(&expr, func(current *Node) { | ||
| if stop { | ||
| return | ||
| } | ||
| switch e := (*current).(type) { | ||
| case *Aggregation: | ||
| switch e.Op { | ||
| case parser.TOPK, parser.BOTTOMK: | ||
| projection = Projection{} | ||
| default: | ||
| projection = Projection{ | ||
| Labels: slices.Clone(e.Grouping), | ||
| Include: !e.Without, | ||
| } | ||
| } | ||
| return | ||
| case *FunctionCall: | ||
| switch e.Func.Name { | ||
| case "absent_over_time", "absent", "scalar": | ||
| projection = Projection{Include: true} | ||
| case "label_replace": | ||
| switch projection.Include { | ||
| case true: | ||
| if slices.Contains(projection.Labels, UnsafeUnwrapString(e.Args[1])) { | ||
| projection.Labels = append(projection.Labels, UnsafeUnwrapString(e.Args[3])) | ||
| } | ||
| case false: | ||
| if !slices.Contains(projection.Labels, UnsafeUnwrapString(e.Args[1])) { | ||
| projection.Labels = slices.DeleteFunc(projection.Labels, func(s string) bool { | ||
| return s == UnsafeUnwrapString(e.Args[3]) | ||
| }) | ||
| } | ||
| } | ||
| } | ||
| case *Binary: | ||
| var highCard, lowCard = e.LHS, e.RHS | ||
| if e.VectorMatching == nil || (!e.VectorMatching.On && len(e.VectorMatching.MatchingLabels) == 0) { | ||
| if IsConstantExpr(lowCard) { | ||
| s.optimize(highCard, projection) | ||
| } else { | ||
| s.optimize(highCard, Projection{}) | ||
| } | ||
|
|
||
| if IsConstantExpr(highCard) { | ||
| s.optimize(lowCard, projection) | ||
| } else { | ||
| s.optimize(lowCard, Projection{}) | ||
| } | ||
| stop = true | ||
| return | ||
| } | ||
| if e.VectorMatching.Card == parser.CardOneToMany { | ||
| highCard, lowCard = lowCard, highCard | ||
| } | ||
|
|
||
| hcProjection := extendProjection(projection, e.VectorMatching.MatchingLabels) | ||
| s.optimize(highCard, hcProjection) | ||
| lcProjection := extendProjection(Projection{ | ||
| Include: e.VectorMatching.On, | ||
| Labels: append([]string{seriesIDLabel}, e.VectorMatching.MatchingLabels...), | ||
| }, e.VectorMatching.Include) | ||
| s.optimize(lowCard, lcProjection) | ||
| stop = true | ||
| case *VectorSelector: | ||
| slices.Sort(projection.Labels) | ||
| projection.Labels = slices.Compact(projection.Labels) | ||
| e.Projection = projection | ||
| projection = Projection{} | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you also need to change MergeSelect or just modify the scanner to get it work with Projection? Like the selects you merged might have different projection labels and you need to fetch both and project only required labels for each select
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm this is a good question, we do not use |
||
| } | ||
| }) | ||
|
|
||
| return expr, annotations.Annotations{} | ||
| } | ||
|
|
||
| func extendProjection(projection Projection, lbls []string) Projection { | ||
| var extendedLabels []string | ||
| if projection.Include { | ||
| extendedLabels = union(projection.Labels, lbls) | ||
| } else { | ||
| extendedLabels = intersect(projection.Labels, lbls) | ||
| } | ||
| return Projection{ | ||
| Include: projection.Include, | ||
| Labels: extendedLabels, | ||
| } | ||
| } | ||
|
|
||
| // union returns the union of two string slices. | ||
| func union(l1 []string, l2 []string) []string { | ||
| m := make(map[string]struct{}) | ||
| for _, s := range l1 { | ||
| m[s] = struct{}{} | ||
| } | ||
| for _, s := range l2 { | ||
| m[s] = struct{}{} | ||
| } | ||
| return slices.Collect(maps.Keys(m)) | ||
| } | ||
|
|
||
| // intersect returns the intersection of two string slices. | ||
| func intersect(l1 []string, l2 []string) []string { | ||
| m := make(map[string]struct{}) | ||
| var result []string | ||
| for _, s := range l1 { | ||
| m[s] = struct{}{} | ||
| } | ||
| for _, s := range l2 { | ||
| if _, ok := m[s]; ok { | ||
| result = append(result, s) | ||
| } | ||
| } | ||
| return result | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,236 @@ | ||
| package logicalplan | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "strings" | ||
| "testing" | ||
|
|
||
| "github.com/prometheus/prometheus/promql/parser" | ||
| "github.com/prometheus/prometheus/util/annotations" | ||
| "github.com/stretchr/testify/require" | ||
| "github.com/thanos-io/promql-engine/query" | ||
| ) | ||
|
|
||
| var reSpaces = strings.NewReplacer("\n", "", "\t", "") | ||
|
|
||
| func TestOptimizeSetProjectionLabels(t *testing.T) { | ||
| cases := []struct { | ||
| name string | ||
| expr string | ||
| expected string | ||
| }{ | ||
| { | ||
| name: "simple vector selector", | ||
| expr: `metric_a{job="api-server"}`, | ||
| expected: `metric_a{job="api-server"}[exclude()]`, | ||
| }, | ||
| { | ||
| name: "top-level label_replace", | ||
| expr: `label_replace(kube_node_info{node="gke-1"}, "instance", "$1", "node", "(.*)")`, | ||
| expected: `label_replace(kube_node_info{node="gke-1"}[exclude()], "instance", "$1", "node", "(.*)")`, | ||
| }, | ||
| { | ||
| name: "sum by all labels", | ||
| expr: `sum(label_replace(kube_node_info{node="gke-1"}, "instance", "$1", "node", "(.*)"))`, | ||
| expected: `sum(label_replace(kube_node_info{node="gke-1"}[project()], "instance", "$1", "node", "(.*)"))`, | ||
| }, | ||
| { | ||
| name: "sum by target label", | ||
| expr: `sum by (instance) (label_replace(kube_node_info{node="gke-1"}, "instance", "$1", "node", "(.*)"))`, | ||
| expected: `sum by (instance) (label_replace(kube_node_info{node="gke-1"}[project(instance, node)], "instance", "$1", "node", "(.*)"))`, | ||
| }, | ||
| { | ||
| name: "sum not including target label", | ||
| expr: `sum by (node, region) (label_replace(kube_node_info{node="gke-1"}, "instance", "$1", "node", "(.*)"))`, | ||
| expected: `sum by (node, region) (label_replace(kube_node_info{node="gke-1"}[project(node, region)], "instance", "$1", "node", "(.*)"))`, | ||
| }, | ||
| { | ||
| name: "sum by source and target label", | ||
| expr: `sum by (node, instance, region) (label_replace(kube_node_info{node="gke-1"}, "instance", "$1", "node", "(.*)"))`, | ||
| expected: `sum by (node, instance, region) (label_replace(kube_node_info{node="gke-1"}[project(instance, node, region)], "instance", "$1", "node", "(.*)"))`, | ||
| }, | ||
| { | ||
| name: "multiple label replace calls", | ||
| expr: ` | ||
| sum by (instance, node, region) ( | ||
| label_replace( | ||
| label_replace(kube_node_info{node="gke-1"}, "ip-addr", "$1", "ip", "(.*)"), | ||
| "instance", "$1", "node", "(.*)" | ||
| ) | ||
| )`, | ||
| expected: `sum by (instance, node, region) (label_replace(label_replace(kube_node_info{node="gke-1"}[project(instance, node, region)], "ip-addr", "$1", "ip", "(.*)"), "instance", "$1", "node", "(.*)"))`, | ||
| }, | ||
| { | ||
| name: "sum without", | ||
| expr: `sum without (xyz) (label_replace(kube_node_info{node="gke-1"}, "instance", "$1", "node", "(.*)"))`, | ||
| expected: `sum without (xyz) (label_replace(kube_node_info{node="gke-1"}[exclude(xyz)], "instance", "$1", "node", "(.*)"))`, | ||
| }, | ||
| { | ||
| name: "absent", | ||
| expr: `absent(kube_node_info{node="gke-1"})`, | ||
| expected: `absent(kube_node_info{node="gke-1"}[project()])`, | ||
| }, | ||
| { | ||
| name: "aggregation with grouping", | ||
| expr: `sum by (pod) (kube_node_info{node="gke-1"})`, | ||
| expected: `sum by (pod) (kube_node_info{node="gke-1"}[project(pod)])`, | ||
| }, | ||
| { | ||
| name: "double aggregation with grouping", | ||
| expr: `max by (pod) (sum by (pod, target) (kube_node_info{node="gke-1"}))`, | ||
| expected: `max by (pod) (sum by (pod, target) (kube_node_info{node="gke-1"}[project(pod, target)]))`, | ||
| }, | ||
| { | ||
| name: "double aggregation with by and without grouping", | ||
| expr: `max by (pod) (sum without (pod, target) (kube_node_info{node="gke-1"}))`, | ||
| expected: `max by (pod) (sum without (pod, target) (kube_node_info{node="gke-1"}[exclude(pod, target)]))`, | ||
| }, | ||
| { | ||
| name: "double aggregation with by and without grouping", | ||
| expr: `max by (pod) (sum without (target) (kube_node_info{node="gke-1"}))`, | ||
| expected: `max by (pod) (sum without (target) (kube_node_info{node="gke-1"}[exclude(target)]))`, | ||
| }, | ||
| { | ||
| name: "aggregation without grouping", | ||
| expr: `sum without (pod) (kube_node_info{node="gke-1"})`, | ||
| expected: `sum without (pod) (kube_node_info{node="gke-1"}[exclude(pod)])`, | ||
| }, | ||
| { | ||
| name: "aggregation with binary expression", | ||
| expr: `sum without (pod) (metric_a * on (node) metric_b)`, | ||
| expected: `sum without (pod) (metric_a[exclude()] * on (node) metric_b[project(__series__id, node)])`, | ||
| }, | ||
| { | ||
| name: "binary expression with vector and constant", | ||
| expr: `sum(metric_a * 3)`, | ||
| expected: `sum(metric_a[project()] * 3)`, | ||
| }, | ||
| { | ||
| name: "binary expression with aggregation and constant", | ||
| expr: `sum(metric_a) * 3`, | ||
| expected: `sum(metric_a[project()]) * 3`, | ||
| }, | ||
| { | ||
| name: "binary expression with one to one matching", | ||
| expr: `metric_a - metric_b`, | ||
| expected: `metric_a[exclude()] - metric_b[exclude()]`, | ||
| }, | ||
| { | ||
| name: "binary expression with one to one matching on label", | ||
| expr: `metric_a - on (node) metric_b`, | ||
| expected: `metric_a[exclude()] - on (node) metric_b[project(__series__id, node)]`, | ||
| }, | ||
| { | ||
| name: "binary expression with one to one matching on label group_left", | ||
| expr: `metric_a - on (node) group_left (cluster) metric_b`, | ||
| expected: `metric_a[exclude()] - on (node) group_left (cluster) metric_b[project(__series__id, cluster, node)]`, | ||
| }, | ||
| { | ||
| name: "binary expression with one to one matching on label group_right", | ||
| expr: `metric_a - on (node) group_right (cluster) metric_b`, | ||
| expected: `metric_a[project(__series__id, cluster, node)] - on (node) group_right (cluster) metric_b[exclude()]`, | ||
| }, | ||
| { | ||
| name: "aggregation with binary expression and one to one matching", | ||
| expr: `max by (k8s_cluster) (metric_a * up)`, | ||
| expected: `max by (k8s_cluster) (metric_a[exclude()] * up[exclude()])`, | ||
| }, | ||
| { | ||
| name: "aggregation with binary expression with one to one matching on one label", | ||
| expr: `max by (k8s_cluster) (metric_a * on(node) up)`, | ||
| expected: `max by (k8s_cluster) (metric_a[project(k8s_cluster, node)] * on (node) up[project(__series__id, node)])`, | ||
| }, | ||
| { | ||
| name: "aggregation with binary expression with matching one label group_left", | ||
| expr: `max by (k8s_cluster) (metric_a * on(node) group_left(hostname) up)`, | ||
| expected: `max by (k8s_cluster) (metric_a[project(k8s_cluster, node)] * on (node) group_left (hostname) up[project(__series__id, hostname, node)])`, | ||
| }, | ||
| { | ||
| name: "aggregation with binary expression with matching one label group_right", | ||
| expr: `max by (k8s_cluster) (metric_a * on(node) group_right(hostname) up)`, | ||
| expected: `max by (k8s_cluster) (metric_a[project(__series__id, hostname, node)] * on (node) group_right (hostname) up[project(k8s_cluster, node)])`, | ||
| }, | ||
| { | ||
| name: "binary expression with aggregation and label replace", | ||
| expr: ` | ||
| topk(5, | ||
| sum by (k8s_cluster) ( | ||
| max(metric_a) by (node) | ||
| * on(node) group_right(kubernetes_io_hostname) label_replace(label_replace(label_replace(up, "node", "$1", "kubernetes_io_hostname", "(.*)"),"node_role", "$1", "role", "(.*)"), "region", "$1", "topology_kubernetes_io_region", "(.*)") | ||
| * on(k8s_cluster) group_left(project) label_replace(k8s_cluster_info, "k8s_cluster", "$0", "cluster", ".*") | ||
| ) | ||
| )`, | ||
| expected: ` | ||
| topk(5, | ||
| sum by (k8s_cluster) ( | ||
| max by (node) (metric_a[project(node)]) | ||
| * on (node) group_right (kubernetes_io_hostname) label_replace(label_replace(label_replace(up[project(k8s_cluster, kubernetes_io_hostname, node)], "node", "$1", "kubernetes_io_hostname", "(.*)"), "node_role", "$1", "role", "(.*)"), "region", "$1", "topology_kubernetes_io_region", "(.*)") | ||
| * on (k8s_cluster) group_left (project) label_replace(k8s_cluster_info[project(__series__id, cluster, k8s_cluster, project)], "k8s_cluster", "$0", "cluster", ".*")))`, | ||
| }, | ||
| { | ||
| name: "binary expression with aggregation and label replace", | ||
| expr: ` | ||
| count by (cluster) ( | ||
| label_replace(up, "region", "$0", "region", ".*") | ||
| * on(cluster, region) group_left(project) label_replace(max by(project, region, cluster)(k8s_cluster_info), "k8s_cluster", "$0", "cluster", ".*") | ||
| )`, | ||
| expected: ` | ||
| count by (cluster) ( | ||
| label_replace(up[project(cluster, region)], "region", "$0", "region", ".*") | ||
| * on (cluster, region) group_left (project) label_replace(max by (project, region, cluster) ( | ||
| k8s_cluster_info[project(cluster, project, region)]), "k8s_cluster", "$0", "cluster", ".*"))`, | ||
| }, | ||
| } | ||
|
|
||
| for _, c := range cases { | ||
| t.Run(c.expr, func(t *testing.T) { | ||
| expr, err := parser.ParseExpr(c.expr) | ||
| require.NoError(t, err) | ||
| plan := NewFromAST(expr, &query.Options{}, PlanOptions{}) | ||
| optimized, annos := plan.Optimize( | ||
| []Optimizer{ | ||
| setProjectionLabels{}, | ||
| // This is a dummy optimizer that replaces VectorSelectors with a custom struct | ||
| // which has a custom String() method. | ||
| swapVectorSelectors{}, | ||
| }) | ||
|
|
||
| require.Equal(t, annotations.Annotations{}, annos) | ||
| require.Equal(t, reSpaces.Replace(c.expected), reSpaces.Replace(optimized.Root().String())) | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| type swapVectorSelectors struct{} | ||
|
|
||
| func (s swapVectorSelectors) Optimize(plan Node, _ *query.Options) (Node, annotations.Annotations) { | ||
| TraverseBottomUp(nil, &plan, func(_, expr *Node) bool { | ||
| switch v := (*expr).(type) { | ||
| case *VectorSelector: | ||
| *expr = newVectorOutput(v) | ||
| return true | ||
| } | ||
| return false | ||
| }) | ||
| return plan, annotations.Annotations{} | ||
| } | ||
|
|
||
| type vectorOutput struct { | ||
| *VectorSelector | ||
| } | ||
|
|
||
| func newVectorOutput(vectorSelector *VectorSelector) *vectorOutput { | ||
| return &vectorOutput{ | ||
| VectorSelector: vectorSelector, | ||
| } | ||
| } | ||
|
|
||
| func (vs vectorOutput) String() string { | ||
| var projectionType string | ||
| if vs.Projection.Include { | ||
| projectionType = "project" | ||
| } else { | ||
| projectionType = "exclude" | ||
| } | ||
| return fmt.Sprintf("%s[%s(%s)]", vs.VectorSelector.String(), projectionType, strings.Join(vs.Projection.Labels, ", ")) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May I know how your scanner is implemented when it is a without? Does it return series hash label in this case?
https://github.com/thanos-io/promql-engine/pull/549/files#diff-2d378c97867270a9cf6883930a4a229b3928d637e4c5c558abf5f4bd913f2f8cR41
This is what I implemented as a mock storage which does projection and always return series hash when projection is available. So maybe there is some difference here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When it is
without, it returns the hash over all labels, and the labels which are not in theLabelsslice.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to include series hash in the grouping label for without? So that it can be removed after the aggregation.
Or you have another place to remove series hash label?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll need to take a look at the details, will post a bit later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we dont need to include because the hash will be just an implementation detail of the storage. We use it for things like: partitioning, map series to iterators, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We include the hash label only when Include is true. We also have an explicit select for the series hash label for some binary joins as shown in tests.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't have hash label when without then how do we pass the duplicate labels check in the engine? Do we have to disable it.
I am trying to run some correctness tests and saw some issues based on my setup. I assume something wrong in my mock storage implementation. Maybe you can try to add a similar correctness test using a mock storage and see if there is any correctness issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We disabled those duplicate checks, and afaik prometheus has a feature to do that now as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. But I can still see some correctness issues after disabling duplicate check so maybe you can take a closer look
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's disable them here too - with a hash column we won't get issues though since we never drop it i think