Skip to content

Commit e3740f7

Browse files
authored
Concurrent Search Tasks Response Updates (#7673)
* Add average,min,max and thread info to tasks response Signed-off-by: Jay Deng <jayd0104@gmail.com> * Update server/src/main/java/org/opensearch/tasks/TaskResourceStats.java Signed-off-by: Jay Deng <jayd0104@gmail.com> --------- Signed-off-by: Jay Deng <jayd0104@gmail.com>
1 parent a9b53e6 commit e3740f7

File tree

11 files changed

+311
-15
lines changed

11 files changed

+311
-15
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
8585

8686
### Changed
8787
- Replace jboss-annotations-api_1.2_spec with jakarta.annotation-api ([#7836](https://github.com/opensearch-project/OpenSearch/pull/7836))
88+
- Add min, max, average and thread info to resource stats in tasks API ([#7673](https://github.com/opensearch-project/OpenSearch/pull/7673))
8889

8990
### Deprecated
9091

client/rest-high-level/src/test/java/org/opensearch/client/core/tasks/GetTaskResponseTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.opensearch.tasks.Task;
4444
import org.opensearch.tasks.TaskId;
4545
import org.opensearch.tasks.TaskInfo;
46+
import org.opensearch.tasks.TaskThreadUsage;
4647
import org.opensearch.test.OpenSearchTestCase;
4748

4849
import java.io.IOException;
@@ -138,12 +139,12 @@ private static RawTaskStatus randomRawTaskStatus() {
138139
}
139140

140141
private static TaskResourceStats randomResourceStats() {
141-
return randomBoolean() ? null : new TaskResourceStats(new HashMap<String, TaskResourceUsage>() {
142+
return randomBoolean() ? null : new TaskResourceStats(new HashMap<>() {
142143
{
143144
for (int i = 0; i < randomInt(5); i++) {
144145
put(randomAlphaOfLength(5), new TaskResourceUsage(randomNonNegativeLong(), randomNonNegativeLong()));
145146
}
146147
}
147-
});
148+
}, new TaskThreadUsage(randomInt(10), randomInt(10)));
148149
}
149150
}

rest-api-spec/src/main/resources/rest-api-spec/test/tasks.list/10_basic.yml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,13 @@
1717
- do:
1818
tasks.list:
1919
group_by: parents
20+
detailed: true
21+
- set:
22+
tasks._arbitrary_key_: task_id
2023

2124
- is_true: tasks
25+
- is_true: tasks.$task_id.resource_stats
26+
- is_true: tasks.$task_id.resource_stats.total
2227

2328
---
2429
"tasks_list headers":
@@ -32,3 +37,21 @@
3237

3338
- is_true: tasks
3439
- match: { tasks.0.headers.X-Opaque-Id: "That is me" }
40+
41+
---
42+
"tasks_list detailed":
43+
- skip:
44+
version: " - 2.99.99"
45+
reason: thread_info was introduced in 3.0.0
46+
47+
- do:
48+
tasks.list:
49+
group_by: parents
50+
detailed: true
51+
- set:
52+
tasks._arbitrary_key_: task_id
53+
54+
- is_true: tasks
55+
- is_true: tasks.$task_id.resource_stats
56+
- is_true: tasks.$task_id.resource_stats.thread_info
57+
- is_true: tasks.$task_id.resource_stats.total

server/src/main/java/org/opensearch/tasks/Task.java

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,14 @@ public class Task {
6767

6868
private static final String TOTAL = "total";
6969

70+
private static final String AVERAGE = "average";
71+
72+
private static final String MIN = "min";
73+
74+
private static final String MAX = "max";
75+
76+
public static final String THREAD_INFO = "thread_info";
77+
7078
private final long id;
7179

7280
private final String type;
@@ -175,8 +183,11 @@ private TaskInfo taskInfo(String localNodeId, boolean detailed, boolean excludeS
175183
resourceStats = new TaskResourceStats(new HashMap<>() {
176184
{
177185
put(TOTAL, getTotalResourceStats());
186+
put(AVERAGE, getAverageResourceStats());
187+
put(MIN, getMinResourceStats());
188+
put(MAX, getMaxResourceStats());
178189
}
179-
});
190+
}, getThreadUsage());
180191
}
181192
return taskInfo(localNodeId, description, status, resourceStats);
182193
}
@@ -289,6 +300,27 @@ public TaskResourceUsage getTotalResourceStats() {
289300
return new TaskResourceUsage(getTotalResourceUtilization(ResourceStats.CPU), getTotalResourceUtilization(ResourceStats.MEMORY));
290301
}
291302

303+
/**
304+
* Returns current average per-execution resource usage of the task.
305+
*/
306+
public TaskResourceUsage getAverageResourceStats() {
307+
return new TaskResourceUsage(getAverageResourceUtilization(ResourceStats.CPU), getAverageResourceUtilization(ResourceStats.MEMORY));
308+
}
309+
310+
/**
311+
* Returns current min per-execution resource usage of the task.
312+
*/
313+
public TaskResourceUsage getMinResourceStats() {
314+
return new TaskResourceUsage(getMinResourceUtilization(ResourceStats.CPU), getMinResourceUtilization(ResourceStats.MEMORY));
315+
}
316+
317+
/**
318+
* Returns current max per-execution resource usage of the task.
319+
*/
320+
public TaskResourceUsage getMaxResourceStats() {
321+
return new TaskResourceUsage(getMaxResourceUtilization(ResourceStats.CPU), getMaxResourceUtilization(ResourceStats.MEMORY));
322+
}
323+
292324
/**
293325
* Returns total resource consumption for a specific task stat.
294326
*/
@@ -305,6 +337,76 @@ public long getTotalResourceUtilization(ResourceStats stats) {
305337
return totalResourceConsumption;
306338
}
307339

340+
/**
341+
* Returns average per-execution resource consumption for a specific task stat.
342+
*/
343+
private long getAverageResourceUtilization(ResourceStats stats) {
344+
long totalResourceConsumption = 0L;
345+
int threadResourceInfoCount = 0;
346+
for (List<ThreadResourceInfo> threadResourceInfosList : resourceStats.values()) {
347+
for (ThreadResourceInfo threadResourceInfo : threadResourceInfosList) {
348+
final ResourceUsageInfo.ResourceStatsInfo statsInfo = threadResourceInfo.getResourceUsageInfo().getStatsInfo().get(stats);
349+
if (threadResourceInfo.getStatsType().isOnlyForAnalysis() == false && statsInfo != null) {
350+
totalResourceConsumption += statsInfo.getTotalValue();
351+
threadResourceInfoCount++;
352+
}
353+
}
354+
}
355+
return (threadResourceInfoCount > 0) ? totalResourceConsumption / threadResourceInfoCount : 0;
356+
}
357+
358+
/**
359+
* Returns minimum per-execution resource consumption for a specific task stat.
360+
*/
361+
private long getMinResourceUtilization(ResourceStats stats) {
362+
if (resourceStats.size() == 0) {
363+
return 0L;
364+
}
365+
long minResourceConsumption = Long.MAX_VALUE;
366+
for (List<ThreadResourceInfo> threadResourceInfosList : resourceStats.values()) {
367+
for (ThreadResourceInfo threadResourceInfo : threadResourceInfosList) {
368+
final ResourceUsageInfo.ResourceStatsInfo statsInfo = threadResourceInfo.getResourceUsageInfo().getStatsInfo().get(stats);
369+
if (threadResourceInfo.getStatsType().isOnlyForAnalysis() == false && statsInfo != null) {
370+
minResourceConsumption = Math.min(minResourceConsumption, statsInfo.getTotalValue());
371+
}
372+
}
373+
}
374+
return minResourceConsumption;
375+
}
376+
377+
/**
378+
* Returns maximum per-execution resource consumption for a specific task stat.
379+
*/
380+
private long getMaxResourceUtilization(ResourceStats stats) {
381+
long maxResourceConsumption = 0L;
382+
for (List<ThreadResourceInfo> threadResourceInfosList : resourceStats.values()) {
383+
for (ThreadResourceInfo threadResourceInfo : threadResourceInfosList) {
384+
final ResourceUsageInfo.ResourceStatsInfo statsInfo = threadResourceInfo.getResourceUsageInfo().getStatsInfo().get(stats);
385+
if (threadResourceInfo.getStatsType().isOnlyForAnalysis() == false && statsInfo != null) {
386+
maxResourceConsumption = Math.max(maxResourceConsumption, statsInfo.getTotalValue());
387+
}
388+
}
389+
}
390+
return maxResourceConsumption;
391+
}
392+
393+
/**
394+
* Returns the total and active number of thread executions for the task.
395+
*/
396+
public TaskThreadUsage getThreadUsage() {
397+
int numThreadExecutions = 0;
398+
int activeThreads = 0;
399+
for (List<ThreadResourceInfo> threadResourceInfosList : resourceStats.values()) {
400+
numThreadExecutions += threadResourceInfosList.size();
401+
for (ThreadResourceInfo threadResourceInfo : threadResourceInfosList) {
402+
if (threadResourceInfo.isActive()) {
403+
activeThreads++;
404+
}
405+
}
406+
}
407+
return new TaskThreadUsage(numThreadExecutions, activeThreads);
408+
}
409+
308410
/**
309411
* Adds thread's starting resource consumption information
310412
* @param threadId ID of the thread

server/src/main/java/org/opensearch/tasks/TaskResourceStats.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.tasks;
1010

11+
import org.opensearch.Version;
1112
import org.opensearch.common.Strings;
1213
import org.opensearch.common.io.stream.StreamInput;
1314
import org.opensearch.common.io.stream.StreamOutput;
@@ -22,6 +23,8 @@
2223
import java.util.Map;
2324
import java.util.Objects;
2425

26+
import static org.opensearch.tasks.Task.THREAD_INFO;
27+
2528
/**
2629
* Resource information about a currently running task.
2730
* <p>
@@ -32,27 +35,42 @@
3235
*/
3336
public class TaskResourceStats implements Writeable, ToXContentFragment {
3437
private final Map<String, TaskResourceUsage> resourceUsage;
38+
private final TaskThreadUsage threadUsage;
3539

36-
public TaskResourceStats(Map<String, TaskResourceUsage> resourceUsage) {
40+
public TaskResourceStats(Map<String, TaskResourceUsage> resourceUsage, TaskThreadUsage threadUsage) {
3741
this.resourceUsage = Objects.requireNonNull(resourceUsage, "resource usage is required");
42+
this.threadUsage = Objects.requireNonNull(threadUsage, "thread usage is required");
3843
}
3944

4045
/**
4146
* Read from a stream.
4247
*/
4348
public TaskResourceStats(StreamInput in) throws IOException {
4449
resourceUsage = in.readMap(StreamInput::readString, TaskResourceUsage::readFromStream);
50+
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
51+
threadUsage = TaskThreadUsage.readFromStream(in);
52+
} else {
53+
// Initialize TaskThreadUsage in case it is not found in mixed cluster case
54+
threadUsage = new TaskThreadUsage(0, 0);
55+
}
4556
}
4657

4758
@Override
4859
public void writeTo(StreamOutput out) throws IOException {
4960
out.writeMap(resourceUsage, StreamOutput::writeString, (stream, stats) -> stats.writeTo(stream));
61+
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
62+
threadUsage.writeTo(out);
63+
}
5064
}
5165

5266
public Map<String, TaskResourceUsage> getResourceUsageInfo() {
5367
return resourceUsage;
5468
}
5569

70+
public TaskThreadUsage getThreadUsage() {
71+
return threadUsage;
72+
}
73+
5674
@Override
5775
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
5876
for (Map.Entry<String, TaskResourceUsage> resourceUsageEntry : resourceUsage.entrySet()) {
@@ -62,6 +80,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
6280
}
6381
builder.endObject();
6482
}
83+
builder.startObject(THREAD_INFO);
84+
threadUsage.toXContent(builder, params);
85+
builder.endObject();
6586
return builder;
6687
}
6788

@@ -74,17 +95,24 @@ public static TaskResourceStats fromXContent(XContentParser parser) throws IOExc
7495
token = parser.nextToken();
7596
}
7697
final Map<String, TaskResourceUsage> resourceStats = new HashMap<>();
98+
// Initialize TaskThreadUsage in case it is not found in mixed cluster case
99+
TaskThreadUsage threadUsage = new TaskThreadUsage(0, 0);
77100
if (token == XContentParser.Token.FIELD_NAME) {
78101
assert parser.currentToken() == XContentParser.Token.FIELD_NAME : "Expected field name but saw [" + parser.currentToken() + "]";
79102
do {
80103
// Must point to field name
81104
String fieldName = parser.currentName();
82105
// And then the value
83-
TaskResourceUsage value = TaskResourceUsage.fromXContent(parser);
84-
resourceStats.put(fieldName, value);
106+
107+
if (fieldName.equals(THREAD_INFO)) {
108+
threadUsage = TaskThreadUsage.fromXContent(parser);
109+
} else {
110+
TaskResourceUsage value = TaskResourceUsage.fromXContent(parser);
111+
resourceStats.put(fieldName, value);
112+
}
85113
} while (parser.nextToken() == XContentParser.Token.FIELD_NAME);
86114
}
87-
return new TaskResourceStats(resourceStats);
115+
return new TaskResourceStats(resourceStats, threadUsage);
88116
}
89117

90118
@Override

0 commit comments

Comments
 (0)