Skip to content

Commit 6da16e1

Browse files
authored
Add cluster manager task throttling stats in nodes stats API (opensearch-project#5790)
* Add cluster manager task throttling stats in nodes stats API Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
1 parent baa6e6e commit 6da16e1

File tree

13 files changed

+177
-9
lines changed

13 files changed

+177
-9
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
107107

108108
## [Unreleased 2.x]
109109
### Added
110+
- Added cluster manager throttling stats in nodes/stats API ([#5790](https://github.com/opensearch-project/OpenSearch/pull/5790))
110111

111112
### Dependencies
112113

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.opensearch.action.support.nodes.BaseNodeResponse;
3737
import org.opensearch.cluster.node.DiscoveryNode;
3838
import org.opensearch.cluster.node.DiscoveryNodeRole;
39+
import org.opensearch.cluster.service.ClusterManagerThrottlingStats;
3940
import org.opensearch.common.Nullable;
4041
import org.opensearch.common.io.stream.StreamInput;
4142
import org.opensearch.common.io.stream.StreamOutput;
@@ -122,6 +123,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
122123
@Nullable
123124
private SearchBackpressureStats searchBackpressureStats;
124125

126+
@Nullable
127+
private ClusterManagerThrottlingStats clusterManagerThrottlingStats;
128+
125129
public NodeStats(StreamInput in) throws IOException {
126130
super(in);
127131
timestamp = in.readVLong();
@@ -152,6 +156,12 @@ public NodeStats(StreamInput in) throws IOException {
152156
} else {
153157
searchBackpressureStats = null;
154158
}
159+
160+
if (in.getVersion().onOrAfter(Version.V_2_6_0)) {
161+
clusterManagerThrottlingStats = in.readOptionalWriteable(ClusterManagerThrottlingStats::new);
162+
} else {
163+
clusterManagerThrottlingStats = null;
164+
}
155165
}
156166

157167
public NodeStats(
@@ -173,7 +183,8 @@ public NodeStats(
173183
@Nullable ScriptCacheStats scriptCacheStats,
174184
@Nullable IndexingPressureStats indexingPressureStats,
175185
@Nullable ShardIndexingPressureStats shardIndexingPressureStats,
176-
@Nullable SearchBackpressureStats searchBackpressureStats
186+
@Nullable SearchBackpressureStats searchBackpressureStats,
187+
@Nullable ClusterManagerThrottlingStats clusterManagerThrottlingStats
177188
) {
178189
super(node);
179190
this.timestamp = timestamp;
@@ -194,6 +205,7 @@ public NodeStats(
194205
this.indexingPressureStats = indexingPressureStats;
195206
this.shardIndexingPressureStats = shardIndexingPressureStats;
196207
this.searchBackpressureStats = searchBackpressureStats;
208+
this.clusterManagerThrottlingStats = clusterManagerThrottlingStats;
197209
}
198210

199211
public long getTimestamp() {
@@ -308,6 +320,11 @@ public SearchBackpressureStats getSearchBackpressureStats() {
308320
return searchBackpressureStats;
309321
}
310322

323+
@Nullable
324+
public ClusterManagerThrottlingStats getClusterManagerThrottlingStats() {
325+
return clusterManagerThrottlingStats;
326+
}
327+
311328
@Override
312329
public void writeTo(StreamOutput out) throws IOException {
313330
super.writeTo(out);
@@ -336,6 +353,9 @@ public void writeTo(StreamOutput out) throws IOException {
336353
if (out.getVersion().onOrAfter(Version.V_2_4_0)) {
337354
out.writeOptionalWriteable(searchBackpressureStats);
338355
}
356+
if (out.getVersion().onOrAfter(Version.V_2_6_0)) {
357+
out.writeOptionalWriteable(clusterManagerThrottlingStats);
358+
}
339359
}
340360

341361
@Override
@@ -411,6 +431,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
411431
if (getSearchBackpressureStats() != null) {
412432
getSearchBackpressureStats().toXContent(builder, params);
413433
}
434+
if (getClusterManagerThrottlingStats() != null) {
435+
getClusterManagerThrottlingStats().toXContent(builder, params);
436+
}
414437
return builder;
415438
}
416439
}

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,8 @@ public enum Metric {
207207
SCRIPT_CACHE("script_cache"),
208208
INDEXING_PRESSURE("indexing_pressure"),
209209
SHARD_INDEXING_PRESSURE("shard_indexing_pressure"),
210-
SEARCH_BACKPRESSURE("search_backpressure");
210+
SEARCH_BACKPRESSURE("search_backpressure"),
211+
CLUSTER_MANAGER_THROTTLING("cluster_manager_throttling");
211212

212213
private String metricName;
213214

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
119119
NodesStatsRequest.Metric.SCRIPT_CACHE.containedIn(metrics),
120120
NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics),
121121
NodesStatsRequest.Metric.SHARD_INDEXING_PRESSURE.containedIn(metrics),
122-
NodesStatsRequest.Metric.SEARCH_BACKPRESSURE.containedIn(metrics)
122+
NodesStatsRequest.Metric.SEARCH_BACKPRESSURE.containedIn(metrics),
123+
NodesStatsRequest.Metric.CLUSTER_MANAGER_THROTTLING.containedIn(metrics)
123124
);
124125
}
125126

server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
163163
false,
164164
false,
165165
false,
166+
false,
166167
false
167168
);
168169
List<ShardStats> shardsStats = new ArrayList<>();

server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingStats.java

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,29 @@
88

99
package org.opensearch.cluster.service;
1010

11+
import org.opensearch.common.io.stream.StreamInput;
12+
import org.opensearch.common.io.stream.StreamOutput;
13+
import org.opensearch.common.io.stream.Writeable;
1114
import org.opensearch.common.metrics.CounterMetric;
15+
import org.opensearch.common.xcontent.ToXContent;
16+
import org.opensearch.common.xcontent.ToXContentFragment;
17+
import org.opensearch.common.xcontent.XContentBuilder;
1218

19+
import java.io.IOException;
1320
import java.util.Map;
1421
import java.util.concurrent.ConcurrentHashMap;
1522

1623
/**
1724
* Contains stats of Cluster Manager Task Throttling.
1825
* It stores the total cumulative count of throttled tasks per task type.
1926
*/
20-
public class ClusterManagerThrottlingStats implements ClusterManagerTaskThrottlerListener {
27+
public class ClusterManagerThrottlingStats implements ClusterManagerTaskThrottlerListener, Writeable, ToXContentFragment {
2128

22-
private Map<String, CounterMetric> throttledTasksCount = new ConcurrentHashMap<>();
29+
private Map<String, CounterMetric> throttledTasksCount;
30+
31+
public ClusterManagerThrottlingStats() {
32+
throttledTasksCount = new ConcurrentHashMap<>();
33+
}
2334

2435
private void incrementThrottlingCount(String type, final int counts) {
2536
throttledTasksCount.computeIfAbsent(type, k -> new CounterMetric()).inc(counts);
@@ -39,4 +50,63 @@ public long getTotalThrottledTaskCount() {
3950
public void onThrottle(String type, int counts) {
4051
incrementThrottlingCount(type, counts);
4152
}
53+
54+
@Override
55+
public void writeTo(StreamOutput out) throws IOException {
56+
out.writeVInt(throttledTasksCount.size());
57+
for (Map.Entry<String, CounterMetric> entry : throttledTasksCount.entrySet()) {
58+
out.writeString(entry.getKey());
59+
out.writeVInt((int) entry.getValue().count());
60+
}
61+
}
62+
63+
public ClusterManagerThrottlingStats(StreamInput in) throws IOException {
64+
int throttledTaskEntries = in.readVInt();
65+
throttledTasksCount = new ConcurrentHashMap<>();
66+
for (int i = 0; i < throttledTaskEntries; i++) {
67+
String taskType = in.readString();
68+
int throttledTaskCount = in.readVInt();
69+
onThrottle(taskType, throttledTaskCount);
70+
}
71+
}
72+
73+
@Override
74+
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
75+
builder.startObject("cluster_manager_throttling");
76+
builder.startObject("stats");
77+
builder.field("total_throttled_tasks", getTotalThrottledTaskCount());
78+
builder.startObject("throttled_tasks_per_task_type");
79+
for (Map.Entry<String, CounterMetric> entry : throttledTasksCount.entrySet()) {
80+
builder.field(entry.getKey(), entry.getValue().count());
81+
}
82+
builder.endObject();
83+
builder.endObject();
84+
return builder.endObject();
85+
}
86+
87+
public boolean equals(Object o) {
88+
if (this == o) {
89+
return true;
90+
} else if (o != null && this.getClass() == o.getClass()) {
91+
ClusterManagerThrottlingStats that = (ClusterManagerThrottlingStats) o;
92+
93+
if (this.throttledTasksCount.size() == that.throttledTasksCount.size()) {
94+
for (Map.Entry<String, CounterMetric> entry : this.throttledTasksCount.entrySet()) {
95+
if (that.throttledTasksCount.get(entry.getKey()).count() != entry.getValue().count()) {
96+
return false;
97+
}
98+
}
99+
return true;
100+
}
101+
}
102+
return false;
103+
}
104+
105+
public int hashCode() {
106+
Map<String, Long> countMap = new ConcurrentHashMap<>();
107+
for (Map.Entry<String, CounterMetric> entry : this.throttledTasksCount.entrySet()) {
108+
countMap.put(entry.getKey(), entry.getValue().count());
109+
}
110+
return countMap.hashCode();
111+
}
42112
}

server/src/main/java/org/opensearch/cluster/service/MasterService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,13 @@ public long numberOfThrottledPendingTasks() {
606606
return throttlingStats.getTotalThrottledTaskCount();
607607
}
608608

609+
/**
610+
* Returns the stats of throttled pending tasks.
611+
*/
612+
public ClusterManagerThrottlingStats getThrottlingStats() {
613+
return throttlingStats;
614+
}
615+
609616
/**
610617
* Returns the min version of nodes in cluster
611618
*/

server/src/main/java/org/opensearch/node/NodeService.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public class NodeService implements Closeable {
8383
private final IndexingPressureService indexingPressureService;
8484
private final AggregationUsageService aggregationUsageService;
8585
private final SearchBackpressureService searchBackpressureService;
86+
private final ClusterService clusterService;
8687

8788
private final Discovery discovery;
8889

@@ -123,6 +124,7 @@ public class NodeService implements Closeable {
123124
this.indexingPressureService = indexingPressureService;
124125
this.aggregationUsageService = aggregationUsageService;
125126
this.searchBackpressureService = searchBackpressureService;
127+
this.clusterService = clusterService;
126128
clusterService.addStateApplier(ingestService);
127129
}
128130

@@ -174,7 +176,8 @@ public NodeStats stats(
174176
boolean scriptCache,
175177
boolean indexingPressure,
176178
boolean shardIndexingPressure,
177-
boolean searchBackpressure
179+
boolean searchBackpressure,
180+
boolean clusterManagerThrottling
178181
) {
179182
// for indices stats we want to include previous allocated shards stats as well (it will
180183
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
@@ -197,7 +200,8 @@ public NodeStats stats(
197200
scriptCache ? scriptService.cacheStats() : null,
198201
indexingPressure ? this.indexingPressureService.nodeStats() : null,
199202
shardIndexingPressure ? this.indexingPressureService.shardStats(indices) : null,
200-
searchBackpressure ? this.searchBackpressureService.nodeStats() : null
203+
searchBackpressure ? this.searchBackpressureService.nodeStats() : null,
204+
clusterManagerThrottling ? this.clusterService.getClusterManagerService().getThrottlingStats() : null
201205
);
202206
}
203207

server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
package org.opensearch.action.admin.cluster.node.stats;
3434

3535
import org.opensearch.cluster.node.DiscoveryNode;
36+
import org.opensearch.cluster.service.ClusterManagerThrottlingStats;
3637
import org.opensearch.common.io.stream.BytesStreamOutput;
3738
import org.opensearch.common.io.stream.StreamInput;
3839
import org.opensearch.discovery.DiscoveryStats;
@@ -418,6 +419,21 @@ public void testSerialization() throws IOException {
418419
assertEquals(limited, sum.getCompilationLimitTriggered());
419420
assertEquals(compilations, sum.getCompilations());
420421
}
422+
ClusterManagerThrottlingStats clusterManagerThrottlingStats = nodeStats.getClusterManagerThrottlingStats();
423+
ClusterManagerThrottlingStats deserializedClusterManagerThrottlingStats = deserializedNodeStats
424+
.getClusterManagerThrottlingStats();
425+
if (clusterManagerThrottlingStats == null) {
426+
assertNull(deserializedClusterManagerThrottlingStats);
427+
} else {
428+
assertEquals(
429+
clusterManagerThrottlingStats.getTotalThrottledTaskCount(),
430+
deserializedClusterManagerThrottlingStats.getTotalThrottledTaskCount()
431+
);
432+
assertEquals(
433+
clusterManagerThrottlingStats.getThrottlingCount("test-task"),
434+
deserializedClusterManagerThrottlingStats.getThrottlingCount("test-task")
435+
);
436+
}
421437
}
422438
}
423439
}
@@ -689,6 +705,11 @@ public static NodeStats createNodeStats() {
689705
}
690706
adaptiveSelectionStats = new AdaptiveSelectionStats(nodeConnections, nodeStats);
691707
}
708+
ClusterManagerThrottlingStats clusterManagerThrottlingStats = null;
709+
if (frequently()) {
710+
clusterManagerThrottlingStats = new ClusterManagerThrottlingStats();
711+
clusterManagerThrottlingStats.onThrottle("test-task", randomInt());
712+
}
692713
ScriptCacheStats scriptCacheStats = scriptStats != null ? scriptStats.toScriptCacheStats() : null;
693714
// TODO NodeIndicesStats are not tested here, way too complicated to create, also they need to be migrated to Writeable yet
694715
return new NodeStats(
@@ -710,7 +731,8 @@ public static NodeStats createNodeStats() {
710731
scriptCacheStats,
711732
null,
712733
null,
713-
null
734+
null,
735+
clusterManagerThrottlingStats
714736
);
715737
}
716738

server/src/test/java/org/opensearch/cluster/DiskUsageTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ public void testFillDiskUsage() {
186186
null,
187187
null,
188188
null,
189+
null,
189190
null
190191
),
191192
new NodeStats(
@@ -207,6 +208,7 @@ public void testFillDiskUsage() {
207208
null,
208209
null,
209210
null,
211+
null,
210212
null
211213
),
212214
new NodeStats(
@@ -228,6 +230,7 @@ public void testFillDiskUsage() {
228230
null,
229231
null,
230232
null,
233+
null,
231234
null
232235
)
233236
);
@@ -280,6 +283,7 @@ public void testFillDiskUsageSomeInvalidValues() {
280283
null,
281284
null,
282285
null,
286+
null,
283287
null
284288
),
285289
new NodeStats(
@@ -301,6 +305,7 @@ public void testFillDiskUsageSomeInvalidValues() {
301305
null,
302306
null,
303307
null,
308+
null,
304309
null
305310
),
306311
new NodeStats(
@@ -322,6 +327,7 @@ public void testFillDiskUsageSomeInvalidValues() {
322327
null,
323328
null,
324329
null,
330+
null,
325331
null
326332
)
327333
);
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.service;
10+
11+
import org.opensearch.common.io.stream.Writeable;
12+
import org.opensearch.test.AbstractWireSerializingTestCase;
13+
14+
public class ClusterManagerThrottlingStatsTests extends AbstractWireSerializingTestCase<ClusterManagerThrottlingStats> {
15+
@Override
16+
protected Writeable.Reader<ClusterManagerThrottlingStats> instanceReader() {
17+
return ClusterManagerThrottlingStats::new;
18+
}
19+
20+
@Override
21+
protected ClusterManagerThrottlingStats createTestInstance() {
22+
return randomInstance();
23+
}
24+
25+
public static ClusterManagerThrottlingStats randomInstance() {
26+
ClusterManagerThrottlingStats randomStats = new ClusterManagerThrottlingStats();
27+
randomStats.onThrottle(randomAlphaOfLengthBetween(3, 10), randomInt());
28+
return randomStats;
29+
}
30+
}

0 commit comments

Comments
 (0)