Skip to content

Esql - Add planner information to the profile #124919

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 26 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
19b95b4
top level plumbing; work in progress
not-napoleon Mar 13, 2025
1e795fc
collected profiles object. WIP, this still doesn't compile
not-napoleon Mar 14, 2025
fa82be2
make collected profiles writable. Still WIP
not-napoleon Mar 14, 2025
838cf28
clean up a few more usages
not-napoleon Mar 14, 2025
bd1567c
Everything Compiles!
not-napoleon Mar 14, 2025
873f2d7
[CI] Auto commit changes from spotless
Mar 14, 2025
9a09236
Profile serialization
not-napoleon Mar 14, 2025
df7f07d
Merge remote-tracking branch 'refs/remotes/not-napoleon/esql-planner-…
not-napoleon Mar 14, 2025
f28aa9e
Just use Profie instead of creating a new class
not-napoleon Mar 17, 2025
704d51a
Use profile instead of explicit lists
not-napoleon Mar 17, 2025
ac2efcd
Merge branch 'main' into esql-planner-profile
not-napoleon Mar 17, 2025
c7b782d
fix tests
not-napoleon Mar 17, 2025
73ddcb7
[CI] Auto commit changes from spotless
Mar 17, 2025
62da773
plumb planner profile through to the rule runner
not-napoleon Mar 17, 2025
9c1536c
Merge remote-tracking branch 'refs/remotes/not-napoleon/esql-planner-…
not-napoleon Mar 17, 2025
41ac4d4
push the profile one layer deeper into the runner
not-napoleon Mar 18, 2025
9bc9550
add new parameters everywhere
not-napoleon Mar 18, 2025
0530d40
[CI] Auto commit changes from spotless
Mar 18, 2025
aa6b085
rule layout draft
not-napoleon Mar 20, 2025
a8ad96d
Merge branch 'main' into esql-planner-profile
not-napoleon Mar 27, 2025
845c431
Merge remote-tracking branch 'refs/remotes/not-napoleon/esql-planner-…
not-napoleon Mar 27, 2025
6658b96
[CI] Auto commit changes from spotless
Mar 27, 2025
b9c07ee
fix some merge errors
not-napoleon Mar 27, 2025
229c11d
lay out data structures for profile
not-napoleon Mar 31, 2025
6987ac0
Merge remote-tracking branch 'refs/remotes/not-napoleon/esql-planner-…
not-napoleon Mar 31, 2025
5dd319d
[CI] Auto commit changes from spotless
Mar 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ static TransportVersion def(int id) {
public static final TransportVersion RESCORE_VECTOR_ALLOW_ZERO = def(9_039_0_00);
public static final TransportVersion PROJECT_ID_IN_SNAPSHOT = def(9_040_0_00);
public static final TransportVersion INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD = def(9_041_0_00);
public static final TransportVersion ESQL_PLANNER_PROFILE = def(9_042_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xpack.core.esql.action.EsqlResponse;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.planner.PlannerProfile;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -346,18 +348,48 @@ public EsqlResponse responseInternal() {

public static class Profile implements Writeable, ChunkedToXContentObject {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: I wonder if we could make this a record?
This way we could avoid custom equals/hashCode/toString.
Merge could be updated to return a new instance instead of mutating this one

private final List<DriverProfile> drivers;
private final List<PlannerProfile> plannerProfile;

public Profile(List<DriverProfile> drivers) {
public static final Profile EMPTY = new Profile(List.of(), List.of());

public Profile() {
this.drivers = new ArrayList<>();
this.plannerProfile = new ArrayList<>();
}

public Profile(List<DriverProfile> drivers, List<PlannerProfile> plannerProfile) {
this.drivers = drivers;
this.plannerProfile = plannerProfile;
}

public Profile(StreamInput in) throws IOException {
this.drivers = in.readCollectionAsImmutableList(DriverProfile::readFrom);
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PLANNER_PROFILE)) {
this.plannerProfile = in.readCollectionAsImmutableList(PlannerProfile::readFrom);
} else {
this.plannerProfile = List.of();
}
}

public void merge(Profile other) {
this.drivers.addAll(other.drivers);
this.plannerProfile.addAll(other.plannerProfile);
}

public List<DriverProfile> getDriverProfiles() {
return drivers;
}

public List<PlannerProfile> getPlannerProfiles() {
return plannerProfile;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(drivers);
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PLANNER_PROFILE)) {
out.writeCollection(plannerProfile);
}
}

@Override
Expand All @@ -369,12 +401,12 @@ public boolean equals(Object o) {
return false;
}
Profile profile = (Profile) o;
return Objects.equals(drivers, profile.drivers);
return Objects.equals(drivers, profile.drivers) && Objects.equals(plannerProfile, profile.plannerProfile);
}

@Override
public int hashCode() {
return Objects.hash(drivers);
return Objects.hash(drivers, plannerProfile);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.planner.PlannerProfile;
import org.elasticsearch.xpack.esql.rule.ParameterizedRule;
import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor;
import org.elasticsearch.xpack.esql.rule.Rule;
Expand Down Expand Up @@ -189,8 +190,8 @@ public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerCon

private final Verifier verifier;

public Analyzer(AnalyzerContext context, Verifier verifier) {
super(context);
public Analyzer(AnalyzerContext context, Verifier verifier, PlannerProfile profile) {
super(context, profile);
this.verifier = verifier;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolver;
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
import org.elasticsearch.xpack.esql.querylog.EsqlQueryLog;
Expand All @@ -43,9 +42,16 @@ public class PlanExecutor {
private final Metrics metrics;
private final Verifier verifier;
private final PlanTelemetryManager planTelemetryManager;
private final String nodeName;
private final EsqlQueryLog queryLog;

public PlanExecutor(IndexResolver indexResolver, MeterRegistry meterRegistry, XPackLicenseState licenseState, EsqlQueryLog queryLog) {
public PlanExecutor(
IndexResolver indexResolver,
MeterRegistry meterRegistry,
XPackLicenseState licenseState,
EsqlQueryLog queryLog,
String nodeName
) {
this.indexResolver = indexResolver;
this.preAnalyzer = new PreAnalyzer();
this.functionRegistry = new EsqlFunctionRegistry();
Expand All @@ -54,6 +60,7 @@ public PlanExecutor(IndexResolver indexResolver, MeterRegistry meterRegistry, XP
this.verifier = new Verifier(metrics, licenseState);
this.planTelemetryManager = new PlanTelemetryManager(meterRegistry);
this.queryLog = queryLog;
this.nodeName = nodeName;
}

public void esql(
Expand All @@ -71,12 +78,13 @@ public void esql(
final PlanTelemetry planTelemetry = new PlanTelemetry(functionRegistry);
final var session = new EsqlSession(
sessionId,
nodeName,
cfg,
indexResolver,
enrichPolicyResolver,
preAnalyzer,
functionRegistry,
new LogicalPlanOptimizer(new LogicalOptimizerContext(cfg, foldContext)),
new LogicalOptimizerContext(cfg, foldContext),
mapper,
verifier,
planTelemetry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.ReplaceMissingFieldWithNull;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.ReplaceTopNWithLimitAndSort;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.planner.PlannerProfile;
import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor;
import org.elasticsearch.xpack.esql.rule.Rule;

Expand All @@ -33,8 +34,8 @@
*/
public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan, LocalLogicalOptimizerContext> {

public LocalLogicalPlanOptimizer(LocalLogicalOptimizerContext localLogicalOptimizerContext) {
super(localLogicalOptimizerContext);
public LocalLogicalPlanOptimizer(LocalLogicalOptimizerContext localLogicalOptimizerContext, PlannerProfile profile) {
super(localLogicalOptimizerContext, profile);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.SpatialDocValuesExtraction;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.SpatialShapeBoundsExtraction;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.PlannerProfile;
import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor;
import org.elasticsearch.xpack.esql.rule.Rule;

Expand All @@ -36,8 +37,8 @@ public class LocalPhysicalPlanOptimizer extends ParameterizedRuleExecutor<Physic

private final PhysicalVerifier verifier = PhysicalVerifier.INSTANCE;

public LocalPhysicalPlanOptimizer(LocalPhysicalOptimizerContext context) {
super(context);
public LocalPhysicalPlanOptimizer(LocalPhysicalOptimizerContext context, PlannerProfile profile) {
super(context, profile);
}

public PhysicalPlan localOptimize(PhysicalPlan plan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.elasticsearch.xpack.esql.optimizer.rules.logical.SubstituteSurrogates;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.TranslateMetricsAggregate;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.planner.PlannerProfile;
import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor;

import java.util.List;
Expand Down Expand Up @@ -95,8 +96,8 @@ public class LogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan,

private final LogicalVerifier verifier = LogicalVerifier.INSTANCE;

public LogicalPlanOptimizer(LogicalOptimizerContext optimizerContext) {
super(optimizerContext);
public LogicalPlanOptimizer(LogicalOptimizerContext optimizerContext, PlannerProfile profile) {
super(optimizerContext, profile);
}

public LogicalPlan optimize(LogicalPlan verified) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns;
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.PlannerProfile;
import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor;
import org.elasticsearch.xpack.esql.rule.RuleExecutor;

Expand All @@ -29,8 +30,8 @@ public class PhysicalPlanOptimizer extends ParameterizedRuleExecutor<PhysicalPla

private final PhysicalVerifier verifier = PhysicalVerifier.INSTANCE;

public PhysicalPlanOptimizer(PhysicalOptimizerContext context) {
super(context);
public PhysicalPlanOptimizer(PhysicalOptimizerContext context, PlannerProfile profile) {
super(context, profile);
}

public PhysicalPlan optimize(PhysicalPlan plan) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.planner;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
* Stores profiling information about the query plan. This can be the top level planning on the coordinating node, or the local
* planning on the data nodes.
*/
public class PlannerProfile implements Writeable, ChunkedToXContentObject {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is still work in progress, but I would suggest we try to design it to be a record from the very beginning


public static final class RuleProfile implements Writeable, ChunkedToXContentObject {
private final List<Long> runDurations;
private int runsWithChanges;

public RuleProfile() {
this.runDurations = new ArrayList<>();
this.runsWithChanges = 0;
}

public RuleProfile(List<Long> durationNanos, int runsWithChanges) {
this.runDurations = durationNanos;
this.runsWithChanges = runsWithChanges;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(runDurations, StreamOutput::writeLong);
out.writeInt(runsWithChanges);
}

public RuleProfile readFrom(StreamInput in) throws IOException {
List<Long> runDurations = in.readCollectionAsImmutableList(StreamInput::readLong);
int runsWithChanges = in.readInt();
return new RuleProfile(runDurations, runsWithChanges);
}

public void addRun(long runDuration, boolean hasChanges) {
runDurations.add(runDuration);
if (hasChanges) {
runsWithChanges++;
}
}

@Override
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
// NOCOMMIT
return null;
}

// NOCOMMIT equals and hashcode, once the fields are stable
}

public static final PlannerProfile EMPTY = new PlannerProfile(false, "");

private final boolean isLocalPlanning;
private final String nodeName;

private Map<String, Map<String, RuleProfile>> ruleProfiles;

public PlannerProfile(boolean isLocalPlanning, String nodeName) {
this.isLocalPlanning = isLocalPlanning;
this.nodeName = nodeName;
ruleProfiles = new HashMap<>();
}

public static PlannerProfile readFrom(StreamInput in) throws IOException {
// NOCOMMIT - need to read back the profiles
return new PlannerProfile(in.readBoolean(), in.readString());
}

public void recordRuleProfile(String batchName, String ruleName, long runDuration, boolean hasChanges) {
ruleProfiles.computeIfAbsent(batchName, k -> new HashMap<>())
.computeIfAbsent(ruleName, k -> new RuleProfile())
.addRun(runDuration, hasChanges);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(isLocalPlanning);
out.writeString(nodeName);
// NOCOMMIT - need to write out the profile map
}

@Override
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
// NOCOMMIT
throw new UnsupportedOperationException();
}
// NOCOMMIT equals and hashcode, once the fields are stable
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,27 @@ public static PhysicalPlan localPlan(
List<SearchExecutionContext> searchContexts,
Configuration configuration,
FoldContext foldCtx,
PhysicalPlan plan
PhysicalPlan plan,
PlannerProfile profile
) {
return localPlan(configuration, foldCtx, plan, SearchContextStats.from(searchContexts));
return localPlan(configuration, foldCtx, plan, SearchContextStats.from(searchContexts), profile);
}

public static PhysicalPlan localPlan(Configuration configuration, FoldContext foldCtx, PhysicalPlan plan, SearchStats searchStats) {
final var logicalOptimizer = new LocalLogicalPlanOptimizer(new LocalLogicalOptimizerContext(configuration, foldCtx, searchStats));
var physicalOptimizer = new LocalPhysicalPlanOptimizer(new LocalPhysicalOptimizerContext(configuration, foldCtx, searchStats));
public static PhysicalPlan localPlan(
Configuration configuration,
FoldContext foldCtx,
PhysicalPlan plan,
SearchStats searchStats,
PlannerProfile profile
) {
final var logicalOptimizer = new LocalLogicalPlanOptimizer(
new LocalLogicalOptimizerContext(configuration, foldCtx, searchStats),
profile
);
var physicalOptimizer = new LocalPhysicalPlanOptimizer(
new LocalPhysicalOptimizerContext(configuration, foldCtx, searchStats),
profile
);

return localPlan(plan, logicalOptimizer, physicalOptimizer);
}
Expand Down
Loading