Skip to content

Expose more detailed profiling information #126525

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Apr 15, 2025
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;

/**
Expand Down Expand Up @@ -2665,6 +2666,12 @@ public static Request newXContentRequest(HttpMethod method, String endpoint, ToX
return request;
}

protected static MapMatcher getProfileMatcher() {
return matchesMap().entry("query", instanceOf(Map.class))
.entry("planning", instanceOf(Map.class))
.entry("drivers", instanceOf(List.class));
}

protected static MapMatcher getResultMatcher(boolean includeMetadata, boolean includePartial) {
MapMatcher mapMatcher = matchesMap();
if (includeMetadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ public void testProfile() throws IOException {
Map<String, Object> result = runEsql(builder);
assertResultMap(
result,
getResultMatcher(result).entry("profile", matchesMap().entry("drivers", instanceOf(List.class))),
getResultMatcher(result).entry("profile", getProfileMatcher()),
matchesList().item(matchesMap().entry("name", "AVG(value)").entry("type", "double")),
equalTo(List.of(List.of(499.5d)))
);
Expand Down Expand Up @@ -502,7 +502,7 @@ public void testInlineStatsProfile() throws IOException {
}
assertResultMap(
result,
getResultMatcher(result).entry("profile", matchesMap().entry("drivers", instanceOf(List.class))),
getResultMatcher(result).entry("profile", getProfileMatcher()),
matchesList().item(matchesMap().entry("name", "@timestamp").entry("type", "date"))
.item(matchesMap().entry("name", "test").entry("type", "text"))
.item(matchesMap().entry("name", "test.keyword").entry("type", "keyword"))
Expand Down Expand Up @@ -605,7 +605,7 @@ public void testForceSleepsProfile() throws IOException {
}
assertResultMap(
result,
getResultMatcher(result).entry("profile", matchesMap().entry("drivers", instanceOf(List.class))),
getResultMatcher(result).entry("profile", getProfileMatcher()),
matchesList().item(matchesMap().entry("name", "AVG(value)").entry("type", "double"))
.item(matchesMap().entry("name", "MAX(value)").entry("type", "long"))
.item(matchesMap().entry("name", "MIN(value)").entry("type", "long"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Stream;
Expand Down Expand Up @@ -67,17 +66,20 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
// Updates to the Cluster occur with the updateCluster method that given the key to map transforms an
// old Cluster Object to a new Cluster Object with the remapping function.
public final Map<String, Cluster> clusterInfo;
private TimeValue overallTook;
// whether the user has asked for CCS metadata to be in the JSON response (the overall took will always be present)
private final boolean includeCCSMetadata;

// fields that are not Writeable since they are only needed on the primary CCS coordinator
private final transient Predicate<String> skipUnavailablePredicate;
private final transient Long relativeStartNanos; // start time for an ESQL query for calculating took times
private transient TimeValue planningTookTime; // time elapsed since start of query to calling ComputeService.execute
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved down and converted to TimeSpans

private volatile boolean isPartial; // Does this request have partial results?
private transient volatile boolean isStopped; // Have we received stop command?

// start time for the ESQL query for calculating time spans relative to the beginning of the query
private final transient TimeSpan.Builder relativeStart;
private transient TimeSpan overallTimeSpan;
private transient TimeSpan planningTimeSpan; // time elapsed since start of query to calling ComputeService.execute
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, this actually need to be serialized in order to pass this information when executing query async

private TimeValue overallTook;

public EsqlExecutionInfo(boolean includeCCSMetadata) {
this(Predicates.always(), includeCCSMetadata); // default all clusters to skip_unavailable=true
}
Expand All @@ -90,18 +92,17 @@ public EsqlExecutionInfo(Predicate<String> skipUnavailablePredicate, boolean inc
this.clusterInfo = ConcurrentCollections.newConcurrentMap();
this.skipUnavailablePredicate = skipUnavailablePredicate;
this.includeCCSMetadata = includeCCSMetadata;
this.relativeStartNanos = System.nanoTime();
this.relativeStart = TimeSpan.start();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be always present on coordinating node

}

/**
* For testing use with fromXContent parsing only
* @param clusterInfo
*/
EsqlExecutionInfo(ConcurrentMap<String, Cluster> clusterInfo, boolean includeCCSMetadata) {
this.clusterInfo = clusterInfo;
this.includeCCSMetadata = includeCCSMetadata;
this.skipUnavailablePredicate = Predicates.always();
this.relativeStartNanos = null;
this.relativeStart = null;
}

public EsqlExecutionInfo(StreamInput in) throws IOException {
Expand All @@ -127,7 +128,7 @@ public EsqlExecutionInfo(StreamInput in) throws IOException {
}

this.skipUnavailablePredicate = Predicates.always();
this.relativeStartNanos = null;
this.relativeStart = null;
}

@Override
Expand All @@ -150,31 +151,28 @@ public boolean includeCCSMetadata() {
return includeCCSMetadata;
}

public Long getRelativeStartNanos() {
return relativeStartNanos;
}

/**
* Call when ES|QL "planning" phase is complete and query execution (in ComputeService) is about to start.
* Note this is currently only built for a single phase planning/execution model. When INLINESTATS
* moves towards GA we may need to revisit this model. Currently, it should never be called more than once.
*/
public void markEndPlanning() {
assert planningTookTime == null : "markEndPlanning should only be called once";
assert relativeStartNanos != null : "Relative start time must be set when markEndPlanning is called";
planningTookTime = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS);
assert planningTimeSpan == null : "markEndPlanning should only be called once";
assert relativeStart != null : "Relative start time must be set when markEndPlanning is called";
planningTimeSpan = relativeStart.stop();
}

public TimeValue planningTookTime() {
return planningTookTime;
return planningTimeSpan != null ? planningTimeSpan.toTimeValue() : null;
}

/**
* Call when ES|QL execution is complete in order to set the overall took time for an ES|QL query.
*/
public void markEndQuery() {
assert relativeStartNanos != null : "Relative start time must be set when markEndQuery is called";
overallTook = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS);
assert relativeStart != null : "Relative start time must be set when markEndQuery is called";
overallTimeSpan = relativeStart.stop();
overallTook = overallTimeSpan.toTimeValue();
}

// for testing only - use markEndQuery in production code
Expand All @@ -190,11 +188,15 @@ public TimeValue overallTook() {
* How much time the query took since starting.
*/
public TimeValue tookSoFar() {
if (relativeStartNanos == null) {
return new TimeValue(0);
} else {
return new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS);
}
return relativeStart != null ? relativeStart.stop().toTimeValue() : TimeValue.ZERO;
}

public TimeSpan overallTimeSpan() {
return overallTimeSpan;
}

public TimeSpan planningTimeSpan() {
return planningTimeSpan;
}

public Set<String> clusterAliases() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,6 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
)
: ResponseXContentUtils.allColumns(columns, "columns");
Iterator<? extends ToXContent> valuesIt = ResponseXContentUtils.columnValues(this.columns, this.pages, columnar, nullColumns);
Iterator<ToXContent> profileRender = profile != null
? ChunkedToXContentHelper.field("profile", profile, params)
: Collections.emptyIterator();
Iterator<ToXContent> executionInfoRender = executionInfo != null && executionInfo.hasMetadataToReport()
? ChunkedToXContentHelper.field("_clusters", executionInfo, params)
: Collections.emptyIterator();
Expand All @@ -238,11 +235,24 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
columnHeadings,
ChunkedToXContentHelper.array("values", valuesIt),
executionInfoRender,
profileRender,
profileRenderer(params),
ChunkedToXContentHelper.endObject()
);
}

private Iterator<ToXContent> profileRenderer(ToXContent.Params params) {
if (profile == null) {
return Collections.emptyIterator();
}
return Iterators.concat(ChunkedToXContentHelper.startObject("profile"), ChunkedToXContentHelper.chunk((b, p) -> {
if (executionInfo != null) {
b.field("query", executionInfo.overallTimeSpan());
b.field("planning", executionInfo.planningTimeSpan());
}
return b;
}), ChunkedToXContentHelper.array("drivers", profile.drivers.iterator(), params), ChunkedToXContentHelper.endObject());
}

public boolean[] nullColumns() {
boolean[] nullColumns = new boolean[columns.size()];
for (int c = 0; c < nullColumns.length; c++) {
Expand All @@ -260,11 +270,6 @@ private boolean allColumnsAreNull(int c) {
return true;
}

@Override
public boolean isFragment() {
return false;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down Expand Up @@ -344,7 +349,7 @@ public EsqlResponse responseInternal() {
return esqlResponse;
}

public static class Profile implements Writeable, ChunkedToXContentObject {
public static class Profile implements Writeable {
private final List<DriverProfile> drivers;

public Profile(List<DriverProfile> drivers) {
Expand Down Expand Up @@ -377,15 +382,6 @@ public int hashCode() {
return Objects.hash(drivers);
}

@Override
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
return Iterators.concat(
ChunkedToXContentHelper.startObject(),
ChunkedToXContentHelper.array("drivers", drivers.iterator(), params),
ChunkedToXContentHelper.endObject()
);
}

List<DriverProfile> drivers() {
return drivers;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;

import static org.elasticsearch.core.TimeValue.timeValueNanos;

/**
* THis class is used to capture a duration of some process, including start and stop point int time.
*/
public record TimeSpan(long startMillis, long startNanos, long stopMillis, long stopNanos) implements Writeable, ToXContentObject {

public static TimeSpan readFrom(StreamInput in) throws IOException {
return new TimeSpan(in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(startMillis);
out.writeVLong(startNanos);
out.writeVLong(stopMillis);
out.writeVLong(stopNanos);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.timestampFieldsFromUnixEpochMillis("start_millis", "start", startMillis);
builder.timestampFieldsFromUnixEpochMillis("stop_millis", "stop", stopMillis);
if (builder.humanReadable()) {
builder.field("took_time", toTimeValue());
}
builder.field("took_millis", durationInMillis());
builder.field("took_nanos", durationInNanos());
builder.endObject();
return builder;
}

public TimeValue toTimeValue() {
return timeValueNanos(stopNanos - startNanos);
}

public long durationInMillis() {
return stopMillis - startMillis;
}

public long durationInNanos() {
return stopNanos - startNanos;
}

public static Builder start() {
return new Builder();
}

public static class Builder {

private final long startMillis = System.currentTimeMillis();
private final long startNanos = System.nanoTime();

public TimeSpan stop() {
return new TimeSpan(startMillis, startNanos, System.currentTimeMillis(), System.nanoTime());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.logging.LogManager;
Expand Down Expand Up @@ -261,7 +260,7 @@ public void execute(
computeListener.acquireCompute().delegateFailure((l, profiles) -> {
if (execInfo.clusterInfo.containsKey(LOCAL_CLUSTER)) {
execInfo.swapCluster(LOCAL_CLUSTER, (k, v) -> {
var tookTime = TimeValue.timeValueNanos(System.nanoTime() - execInfo.getRelativeStartNanos());
var tookTime = execInfo.tookSoFar();
var builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(tookTime);
if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
final Integer failedShards = execInfo.getCluster(LOCAL_CLUSTER).getFailedShards();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,12 +502,10 @@ public void testUpdateExecutionInfoAtEndOfPlanning() {
(k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true, EsqlExecutionInfo.Cluster.Status.SKIPPED)
);
executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", false));

assertNull(executionInfo.planningTookTime());
assertNull(executionInfo.overallTook());
try {
Thread.sleep(1);
} catch (InterruptedException e) {}

safeSleep(1);

EsqlCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo);

Expand Down