Skip to content

Commit 3d1fa98

Browse files
authored
Query-level resource usages tracking (#13172)
* Query-level resource usages tracking Signed-off-by: Chenyang Ji <cyji@amazon.com> * Moving TaskResourceTrackingService to clusterService Signed-off-by: Chenyang Ji <cyji@amazon.com> * use shard response header to piggyback task resource usages Signed-off-by: Chenyang Ji <cyji@amazon.com> * split changes for query insights plugin Signed-off-by: Chenyang Ji <cyji@amazon.com> * improve the supplier logic and other misc items Signed-off-by: Chenyang Ji <cyji@amazon.com> * track resource usage for failed requests Signed-off-by: Chenyang Ji <cyji@amazon.com> * move resource usages interactions into TaskResourceTrackingService Signed-off-by: Chenyang Ji <cyji@amazon.com> --------- Signed-off-by: Chenyang Ji <cyji@amazon.com>
1 parent cfaa196 commit 3d1fa98

29 files changed

+763
-66
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2121
- [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027))
2222
- [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982))
2323
- [Query Insights] Add X-Opaque-Id to search request metadata for top n queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374))
24+
- Add support for query level resource usage tracking ([#13172](https://github.com/opensearch-project/OpenSearch/pull/13172))
2425

2526
### Dependencies
2627
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))

libs/core/src/main/java/org/opensearch/core/tasks/resourcetracker/ResourceUsageInfo.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ public long getTotalValue() {
104104
return endValue.get() - startValue;
105105
}
106106

107+
public long getStartValue() {
108+
return startValue;
109+
}
110+
107111
@Override
108112
public String toString() {
109113
return String.valueOf(getTotalValue());
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.core.tasks.resourcetracker;
10+
11+
import org.opensearch.common.annotation.PublicApi;
12+
import org.opensearch.core.ParseField;
13+
import org.opensearch.core.common.Strings;
14+
import org.opensearch.core.common.io.stream.StreamInput;
15+
import org.opensearch.core.common.io.stream.StreamOutput;
16+
import org.opensearch.core.common.io.stream.Writeable;
17+
import org.opensearch.core.xcontent.ConstructingObjectParser;
18+
import org.opensearch.core.xcontent.MediaTypeRegistry;
19+
import org.opensearch.core.xcontent.ToXContentObject;
20+
import org.opensearch.core.xcontent.XContentBuilder;
21+
22+
import java.io.IOException;
23+
import java.util.Objects;
24+
25+
import static org.opensearch.core.xcontent.ConstructingObjectParser.constructorArg;
26+
27+
/**
28+
* Task resource usage information with minimal information about the task
29+
* <p>
30+
* Writeable TaskResourceInfo objects are used to represent resource usage
31+
* information of running tasks, which can be propagated to coordinator node
32+
* to infer query-level resource usage
33+
*
34+
* @opensearch.api
35+
*/
36+
@PublicApi(since = "2.15.0")
37+
public class TaskResourceInfo implements Writeable, ToXContentObject {
38+
private final String action;
39+
private final long taskId;
40+
private final long parentTaskId;
41+
private final String nodeId;
42+
private final TaskResourceUsage taskResourceUsage;
43+
44+
private static final ParseField ACTION = new ParseField("action");
45+
private static final ParseField TASK_ID = new ParseField("taskId");
46+
private static final ParseField PARENT_TASK_ID = new ParseField("parentTaskId");
47+
private static final ParseField NODE_ID = new ParseField("nodeId");
48+
private static final ParseField TASK_RESOURCE_USAGE = new ParseField("taskResourceUsage");
49+
50+
public TaskResourceInfo(
51+
final String action,
52+
final long taskId,
53+
final long parentTaskId,
54+
final String nodeId,
55+
final TaskResourceUsage taskResourceUsage
56+
) {
57+
this.action = action;
58+
this.taskId = taskId;
59+
this.parentTaskId = parentTaskId;
60+
this.nodeId = nodeId;
61+
this.taskResourceUsage = taskResourceUsage;
62+
}
63+
64+
public static final ConstructingObjectParser<TaskResourceInfo, Void> PARSER = new ConstructingObjectParser<>(
65+
"task_resource_info",
66+
a -> new Builder().setAction((String) a[0])
67+
.setTaskId((Long) a[1])
68+
.setParentTaskId((Long) a[2])
69+
.setNodeId((String) a[3])
70+
.setTaskResourceUsage((TaskResourceUsage) a[4])
71+
.build()
72+
);
73+
74+
static {
75+
PARSER.declareString(constructorArg(), ACTION);
76+
PARSER.declareLong(constructorArg(), TASK_ID);
77+
PARSER.declareLong(constructorArg(), PARENT_TASK_ID);
78+
PARSER.declareString(constructorArg(), NODE_ID);
79+
PARSER.declareObject(constructorArg(), TaskResourceUsage.PARSER, TASK_RESOURCE_USAGE);
80+
}
81+
82+
@Override
83+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
84+
builder.startObject();
85+
builder.field(ACTION.getPreferredName(), this.action);
86+
builder.field(TASK_ID.getPreferredName(), this.taskId);
87+
builder.field(PARENT_TASK_ID.getPreferredName(), this.parentTaskId);
88+
builder.field(NODE_ID.getPreferredName(), this.nodeId);
89+
builder.startObject(TASK_RESOURCE_USAGE.getPreferredName());
90+
this.taskResourceUsage.toXContent(builder, params);
91+
builder.endObject();
92+
builder.endObject();
93+
return builder;
94+
}
95+
96+
/**
97+
* Builder for {@link TaskResourceInfo}
98+
*/
99+
public static class Builder {
100+
private TaskResourceUsage taskResourceUsage;
101+
private String action;
102+
private long taskId;
103+
private long parentTaskId;
104+
private String nodeId;
105+
106+
public Builder setTaskResourceUsage(final TaskResourceUsage taskResourceUsage) {
107+
this.taskResourceUsage = taskResourceUsage;
108+
return this;
109+
}
110+
111+
public Builder setAction(final String action) {
112+
this.action = action;
113+
return this;
114+
}
115+
116+
public Builder setTaskId(final long taskId) {
117+
this.taskId = taskId;
118+
return this;
119+
}
120+
121+
public Builder setParentTaskId(final long parentTaskId) {
122+
this.parentTaskId = parentTaskId;
123+
return this;
124+
}
125+
126+
public Builder setNodeId(final String nodeId) {
127+
this.nodeId = nodeId;
128+
return this;
129+
}
130+
131+
public TaskResourceInfo build() {
132+
return new TaskResourceInfo(action, taskId, parentTaskId, nodeId, taskResourceUsage);
133+
}
134+
}
135+
136+
/**
137+
* Read task info from a stream.
138+
*
139+
* @param in StreamInput to read
140+
* @return {@link TaskResourceInfo}
141+
* @throws IOException IOException
142+
*/
143+
public static TaskResourceInfo readFromStream(StreamInput in) throws IOException {
144+
return new TaskResourceInfo.Builder().setAction(in.readString())
145+
.setTaskId(in.readLong())
146+
.setParentTaskId(in.readLong())
147+
.setNodeId(in.readString())
148+
.setTaskResourceUsage(TaskResourceUsage.readFromStream(in))
149+
.build();
150+
}
151+
152+
/**
153+
* Get TaskResourceUsage
154+
*
155+
* @return taskResourceUsage
156+
*/
157+
public TaskResourceUsage getTaskResourceUsage() {
158+
return taskResourceUsage;
159+
}
160+
161+
/**
162+
* Get parent task id
163+
*
164+
* @return parent task id
165+
*/
166+
public long getParentTaskId() {
167+
return parentTaskId;
168+
}
169+
170+
/**
171+
* Get task id
172+
* @return task id
173+
*/
174+
public long getTaskId() {
175+
return taskId;
176+
}
177+
178+
/**
179+
* Get node id
180+
* @return node id
181+
*/
182+
public String getNodeId() {
183+
return nodeId;
184+
}
185+
186+
/**
187+
* Get task action
188+
* @return task action
189+
*/
190+
public String getAction() {
191+
return action;
192+
}
193+
194+
@Override
195+
public void writeTo(StreamOutput out) throws IOException {
196+
out.writeString(action);
197+
out.writeLong(taskId);
198+
out.writeLong(parentTaskId);
199+
out.writeString(nodeId);
200+
taskResourceUsage.writeTo(out);
201+
}
202+
203+
@Override
204+
public String toString() {
205+
return Strings.toString(MediaTypeRegistry.JSON, this);
206+
}
207+
208+
@Override
209+
public boolean equals(Object obj) {
210+
if (obj == null || obj.getClass() != TaskResourceInfo.class) {
211+
return false;
212+
}
213+
TaskResourceInfo other = (TaskResourceInfo) obj;
214+
return action.equals(other.action)
215+
&& taskId == other.taskId
216+
&& parentTaskId == other.parentTaskId
217+
&& Objects.equals(nodeId, other.nodeId)
218+
&& taskResourceUsage.equals(other.taskResourceUsage);
219+
}
220+
221+
@Override
222+
public int hashCode() {
223+
return Objects.hash(action, taskId, parentTaskId, nodeId, taskResourceUsage);
224+
}
225+
}

server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.opensearch.core.action.ActionListener;
5252
import org.opensearch.core.action.ShardOperationFailedException;
5353
import org.opensearch.core.index.shard.ShardId;
54+
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;
5455
import org.opensearch.search.SearchPhaseResult;
5556
import org.opensearch.search.SearchShardTarget;
5657
import org.opensearch.search.internal.AliasFilter;
@@ -469,6 +470,10 @@ private void onRequestEnd(SearchRequestContext searchRequestContext) {
469470
this.searchRequestContext.getSearchRequestOperationsListener().onRequestEnd(this, searchRequestContext);
470471
}
471472

473+
private void onRequestFailure(SearchRequestContext searchRequestContext) {
474+
this.searchRequestContext.getSearchRequestOperationsListener().onRequestFailure(this, searchRequestContext);
475+
}
476+
472477
private void executePhase(SearchPhase phase) {
473478
Span phaseSpan = tracer.startSpan(SpanCreationContext.server().name("[phase/" + phase.getName() + "]"));
474479
try (final SpanScope scope = tracer.withSpanInScope(phaseSpan)) {
@@ -507,6 +512,7 @@ ShardSearchFailure[] buildShardFailures() {
507512
private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) {
508513
// we always add the shard failure for a specific shard instance
509514
// we do make sure to clean it on a successful response from a shard
515+
setPhaseResourceUsages();
510516
onShardFailure(shardIndex, shard, e);
511517
SearchShardTarget nextShard = FailAwareWeightedRouting.getInstance()
512518
.findNext(shardIt, clusterState, e, () -> totalOps.incrementAndGet());
@@ -618,9 +624,15 @@ protected void onShardResult(Result result, SearchShardIterator shardIt) {
618624
if (logger.isTraceEnabled()) {
619625
logger.trace("got first-phase result from {}", result != null ? result.getSearchShardTarget() : null);
620626
}
627+
this.setPhaseResourceUsages();
621628
results.consumeResult(result, () -> onShardResultConsumed(result, shardIt));
622629
}
623630

631+
public void setPhaseResourceUsages() {
632+
TaskResourceInfo taskResourceUsage = searchRequestContext.getTaskResourceUsageSupplier().get();
633+
searchRequestContext.recordPhaseResourceUsage(taskResourceUsage);
634+
}
635+
624636
private void onShardResultConsumed(Result result, SearchShardIterator shardIt) {
625637
successfulOps.incrementAndGet();
626638
// clean a previous error on this shard group (note, this code will be serialized on the same shardIndex value level
@@ -751,6 +763,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
751763

752764
@Override
753765
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
766+
setPhaseResourceUsages();
754767
if (currentPhaseHasLifecycle) {
755768
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this, cause);
756769
}
@@ -780,6 +793,7 @@ private void raisePhaseFailure(SearchPhaseExecutionException exception) {
780793
});
781794
}
782795
Releasables.close(releasables);
796+
onRequestFailure(searchRequestContext);
783797
listener.onFailure(exception);
784798
}
785799

server/src/main/java/org/opensearch/action/search/FetchSearchPhase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ private void executeFetch(
240240
public void innerOnResponse(FetchSearchResult result) {
241241
try {
242242
progressListener.notifyFetchResult(shardIndex);
243+
context.setPhaseResourceUsages();
243244
counter.onResult(result);
244245
} catch (Exception e) {
245246
context.onPhaseFailure(FetchSearchPhase.this, "", e);
@@ -254,6 +255,7 @@ public void onFailure(Exception e) {
254255
e
255256
);
256257
progressListener.notifyFetchFailure(shardIndex, shardTarget, e);
258+
context.setPhaseResourceUsages();
257259
counter.onFailure(shardIndex, shardTarget, e);
258260
} finally {
259261
// the search context might not be cleared on the node where the fetch was executed for example

server/src/main/java/org/opensearch/action/search/SearchPhaseContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,4 +150,9 @@ default void sendReleaseSearchContext(
150150
* Registers a {@link Releasable} that will be closed when the search request finishes or fails.
151151
*/
152152
void addReleasable(Releasable releasable);
153+
154+
/**
155+
* Set the resource usage info for this phase
156+
*/
157+
void setPhaseResourceUsages();
153158
}

0 commit comments

Comments
 (0)