Skip to content

Commit e4329b8

Browse files
committed
Optimize Cluster Stats Indices to precomute node level stats
Signed-off-by: Pranshu Shukla <pranshushukla06@gmail.com>
1 parent bda8393 commit e4329b8

File tree

9 files changed

+536
-42
lines changed

9 files changed

+536
-42
lines changed

server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIT.java

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,11 @@ public void testNodeCounts() {
8888
Map<String, Integer> expectedCounts = getExpectedCounts(1, 1, 1, 1, 1, 0, 0);
8989
int numNodes = randomIntBetween(1, 5);
9090

91-
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
91+
ClusterStatsResponse response = client().admin()
92+
.cluster()
93+
.prepareClusterStats()
94+
.useOptimizedClusterStatsResponse(randomBoolean())
95+
.get();
9296
assertCounts(response.getNodesStats().getCounts(), total, expectedCounts);
9397

9498
for (int i = 0; i < numNodes; i++) {
@@ -153,7 +157,11 @@ public void testNodeCountsWithDeprecatedMasterRole() throws ExecutionException,
153157
Map<String, Integer> expectedCounts = getExpectedCounts(0, 1, 1, 0, 0, 0, 0);
154158

155159
Client client = client();
156-
ClusterStatsResponse response = client.admin().cluster().prepareClusterStats().get();
160+
ClusterStatsResponse response = client.admin()
161+
.cluster()
162+
.prepareClusterStats()
163+
.useOptimizedClusterStatsResponse(randomBoolean())
164+
.get();
157165
assertCounts(response.getNodesStats().getCounts(), total, expectedCounts);
158166

159167
Set<String> expectedRoles = Set.of(DiscoveryNodeRole.MASTER_ROLE.roleName());
@@ -179,7 +187,11 @@ private void assertShardStats(ClusterStatsIndices.ShardStats stats, int indices,
179187
public void testIndicesShardStats() throws ExecutionException, InterruptedException {
180188
internalCluster().startNode();
181189
ensureGreen();
182-
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
190+
ClusterStatsResponse response = client().admin()
191+
.cluster()
192+
.prepareClusterStats()
193+
.useOptimizedClusterStatsResponse(randomBoolean())
194+
.get();
183195
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
184196

185197
prepareCreate("test1").setSettings(Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 1)).get();
@@ -195,14 +207,14 @@ public void testIndicesShardStats() throws ExecutionException, InterruptedExcept
195207
ensureGreen();
196208
index("test1", "type", "1", "f", "f");
197209
refresh(); // make the doc visible
198-
response = client().admin().cluster().prepareClusterStats().get();
210+
response = client().admin().cluster().prepareClusterStats().useOptimizedClusterStatsResponse(randomBoolean()).get();
199211
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
200212
assertThat(response.indicesStats.getDocs().getCount(), Matchers.equalTo(1L));
201213
assertShardStats(response.getIndicesStats().getShards(), 1, 4, 2, 1.0);
202214

203215
prepareCreate("test2").setSettings(Settings.builder().put("number_of_shards", 3).put("number_of_replicas", 0)).get();
204216
ensureGreen();
205-
response = client().admin().cluster().prepareClusterStats().get();
217+
response = client().admin().cluster().prepareClusterStats().useOptimizedClusterStatsResponse(randomBoolean()).get();
206218
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
207219
assertThat(response.indicesStats.getIndexCount(), Matchers.equalTo(2));
208220
assertShardStats(response.getIndicesStats().getShards(), 2, 7, 5, 2.0 / 5);
@@ -225,7 +237,11 @@ public void testValuesSmokeScreen() throws IOException, ExecutionException, Inte
225237
internalCluster().startNodes(randomIntBetween(1, 3));
226238
index("test1", "type", "1", "f", "f");
227239

228-
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
240+
ClusterStatsResponse response = client().admin()
241+
.cluster()
242+
.prepareClusterStats()
243+
.useOptimizedClusterStatsResponse(randomBoolean())
244+
.get();
229245
String msg = response.toString();
230246
assertThat(msg, response.getTimestamp(), Matchers.greaterThan(946681200000L)); // 1 Jan 2000
231247
assertThat(msg, response.indicesStats.getStore().getSizeInBytes(), Matchers.greaterThan(0L));
@@ -265,13 +281,21 @@ public void testAllocatedProcessors() throws Exception {
265281
internalCluster().startNode(Settings.builder().put(OpenSearchExecutors.NODE_PROCESSORS_SETTING.getKey(), 7).build());
266282
waitForNodes(1);
267283

268-
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
284+
ClusterStatsResponse response = client().admin()
285+
.cluster()
286+
.prepareClusterStats()
287+
.useOptimizedClusterStatsResponse(randomBoolean())
288+
.get();
269289
assertThat(response.getNodesStats().getOs().getAllocatedProcessors(), equalTo(7));
270290
}
271291

272292
public void testClusterStatusWhenStateNotRecovered() throws Exception {
273293
internalCluster().startClusterManagerOnlyNode(Settings.builder().put("gateway.recover_after_nodes", 2).build());
274-
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
294+
ClusterStatsResponse response = client().admin()
295+
.cluster()
296+
.prepareClusterStats()
297+
.useOptimizedClusterStatsResponse(randomBoolean())
298+
.get();
275299
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.RED));
276300

277301
if (randomBoolean()) {
@@ -281,14 +305,18 @@ public void testClusterStatusWhenStateNotRecovered() throws Exception {
281305
}
282306
// wait for the cluster status to settle
283307
ensureGreen();
284-
response = client().admin().cluster().prepareClusterStats().get();
308+
response = client().admin().cluster().prepareClusterStats().useOptimizedClusterStatsResponse(randomBoolean()).get();
285309
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.GREEN));
286310
}
287311

288312
public void testFieldTypes() {
289313
internalCluster().startNode();
290314
ensureGreen();
291-
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
315+
ClusterStatsResponse response = client().admin()
316+
.cluster()
317+
.prepareClusterStats()
318+
.useOptimizedClusterStatsResponse(randomBoolean())
319+
.get();
292320
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
293321
assertTrue(response.getIndicesStats().getMappings().getFieldTypeStats().isEmpty());
294322

@@ -301,7 +329,7 @@ public void testFieldTypes() {
301329
+ "\"eggplant\":{\"type\":\"integer\"}}}}}"
302330
)
303331
.get();
304-
response = client().admin().cluster().prepareClusterStats().get();
332+
response = client().admin().cluster().prepareClusterStats().useOptimizedClusterStatsResponse(randomBoolean()).get();
305333
assertThat(response.getIndicesStats().getMappings().getFieldTypeStats().size(), equalTo(3));
306334
Set<IndexFeatureStats> stats = response.getIndicesStats().getMappings().getFieldTypeStats();
307335
for (IndexFeatureStats stat : stats) {
@@ -329,7 +357,11 @@ public void testNodeRolesWithMasterLegacySettings() throws ExecutionException, I
329357
Map<String, Integer> expectedCounts = getExpectedCounts(0, 1, 1, 0, 1, 0, 0);
330358

331359
Client client = client();
332-
ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get();
360+
ClusterStatsResponse clusterStatsResponse = client.admin()
361+
.cluster()
362+
.prepareClusterStats()
363+
.useOptimizedClusterStatsResponse(randomBoolean())
364+
.get();
333365
assertCounts(clusterStatsResponse.getNodesStats().getCounts(), total, expectedCounts);
334366

335367
Set<String> expectedRoles = Set.of(
@@ -359,7 +391,11 @@ public void testNodeRolesWithClusterManagerRole() throws ExecutionException, Int
359391
Map<String, Integer> expectedCounts = getExpectedCounts(0, 1, 1, 0, 1, 0, 0);
360392

361393
Client client = client();
362-
ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get();
394+
ClusterStatsResponse clusterStatsResponse = client.admin()
395+
.cluster()
396+
.prepareClusterStats()
397+
.useOptimizedClusterStatsResponse(randomBoolean())
398+
.get();
363399
assertCounts(clusterStatsResponse.getNodesStats().getCounts(), total, expectedCounts);
364400

365401
Set<String> expectedRoles = Set.of(
@@ -383,7 +419,11 @@ public void testNodeRolesWithSeedDataNodeLegacySettings() throws ExecutionExcept
383419
Map<String, Integer> expectedRoleCounts = getExpectedCounts(1, 1, 1, 0, 1, 0, 0);
384420

385421
Client client = client();
386-
ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get();
422+
ClusterStatsResponse clusterStatsResponse = client.admin()
423+
.cluster()
424+
.prepareClusterStats()
425+
.useOptimizedClusterStatsResponse(randomBoolean())
426+
.get();
387427
assertCounts(clusterStatsResponse.getNodesStats().getCounts(), total, expectedRoleCounts);
388428

389429
Set<String> expectedRoles = Set.of(
@@ -410,7 +450,11 @@ public void testNodeRolesWithDataNodeLegacySettings() throws ExecutionException,
410450
Map<String, Integer> expectedRoleCounts = getExpectedCounts(1, 1, 1, 0, 1, 0, 0);
411451

412452
Client client = client();
413-
ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get();
453+
ClusterStatsResponse clusterStatsResponse = client.admin()
454+
.cluster()
455+
.prepareClusterStats()
456+
.useOptimizedClusterStatsResponse(randomBoolean())
457+
.get();
414458
assertCounts(clusterStatsResponse.getNodesStats().getCounts(), total, expectedRoleCounts);
415459

416460
Set<Set<String>> expectedNodesRoles = Set.of(

server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIndices.java

Lines changed: 63 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@
3434

3535
import org.opensearch.action.admin.indices.stats.CommonStats;
3636
import org.opensearch.common.annotation.PublicApi;
37+
import org.opensearch.core.common.io.stream.StreamInput;
38+
import org.opensearch.core.common.io.stream.StreamOutput;
39+
import org.opensearch.core.common.io.stream.Writeable;
3740
import org.opensearch.core.xcontent.ToXContentFragment;
3841
import org.opensearch.core.xcontent.XContentBuilder;
3942
import org.opensearch.index.cache.query.QueryCacheStats;
@@ -78,26 +81,44 @@ public ClusterStatsIndices(List<ClusterStatsNodeResponse> nodeResponses, Mapping
7881
this.segments = new SegmentsStats();
7982

8083
for (ClusterStatsNodeResponse r : nodeResponses) {
81-
for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) {
82-
ShardStats indexShardStats = countsPerIndex.get(shardStats.getShardRouting().getIndexName());
83-
if (indexShardStats == null) {
84-
indexShardStats = new ShardStats();
85-
countsPerIndex.put(shardStats.getShardRouting().getIndexName(), indexShardStats);
86-
}
87-
88-
indexShardStats.total++;
89-
90-
CommonStats shardCommonStats = shardStats.getStats();
91-
92-
if (shardStats.getShardRouting().primary()) {
93-
indexShardStats.primaries++;
94-
docs.add(shardCommonStats.docs);
84+
// Optimized response from the node
85+
if (r.getNodeIndexShardStats() != null) {
86+
r.getNodeIndexShardStats().indexStatsMap.forEach(
87+
(index, indexCountStats) -> countsPerIndex.merge(index, indexCountStats, (v1, v2) -> {
88+
v1.addStatsFrom(v2);
89+
return v1;
90+
})
91+
);
92+
93+
docs.add(r.getNodeIndexShardStats().docs);
94+
store.add(r.getNodeIndexShardStats().store);
95+
fieldData.add(r.getNodeIndexShardStats().fieldData);
96+
queryCache.add(r.getNodeIndexShardStats().queryCache);
97+
completion.add(r.getNodeIndexShardStats().completion);
98+
segments.add(r.getNodeIndexShardStats().segments);
99+
} else {
100+
// Default response from the node
101+
for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) {
102+
ShardStats indexShardStats = countsPerIndex.get(shardStats.getShardRouting().getIndexName());
103+
if (indexShardStats == null) {
104+
indexShardStats = new ShardStats();
105+
countsPerIndex.put(shardStats.getShardRouting().getIndexName(), indexShardStats);
106+
}
107+
108+
indexShardStats.total++;
109+
110+
CommonStats shardCommonStats = shardStats.getStats();
111+
112+
if (shardStats.getShardRouting().primary()) {
113+
indexShardStats.primaries++;
114+
docs.add(shardCommonStats.docs);
115+
}
116+
store.add(shardCommonStats.store);
117+
fieldData.add(shardCommonStats.fieldData);
118+
queryCache.add(shardCommonStats.queryCache);
119+
completion.add(shardCommonStats.completion);
120+
segments.add(shardCommonStats.segments);
95121
}
96-
store.add(shardCommonStats.store);
97-
fieldData.add(shardCommonStats.fieldData);
98-
queryCache.add(shardCommonStats.queryCache);
99-
completion.add(shardCommonStats.completion);
100-
segments.add(shardCommonStats.segments);
101122
}
102123
}
103124

@@ -185,11 +206,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
185206
* @opensearch.api
186207
*/
187208
@PublicApi(since = "1.0.0")
188-
public static class ShardStats implements ToXContentFragment {
209+
public static class ShardStats implements ToXContentFragment, Writeable {
189210

190-
int indices;
191-
int total;
192-
int primaries;
211+
int indices = 0;
212+
int total = 0;
213+
int primaries = 0;
193214

194215
// min/max
195216
int minIndexShards = -1;
@@ -202,6 +223,12 @@ public static class ShardStats implements ToXContentFragment {
202223

203224
public ShardStats() {}
204225

226+
public ShardStats(StreamInput in) throws IOException {
227+
indices = in.readVInt();
228+
total = in.readVInt();
229+
primaries = in.readVInt();
230+
}
231+
205232
/**
206233
* number of indices in the cluster
207234
*/
@@ -329,6 +356,19 @@ public void addIndexShardCount(ShardStats indexShardCount) {
329356
}
330357
}
331358

359+
public void addStatsFrom(ShardStats incomingStats) {
360+
this.total += incomingStats.getTotal();
361+
this.indices += incomingStats.getIndices();
362+
this.primaries += incomingStats.getPrimaries();
363+
}
364+
365+
@Override
366+
public void writeTo(StreamOutput out) throws IOException {
367+
out.writeVInt(indices);
368+
out.writeVInt(total);
369+
out.writeVInt(primaries);
370+
}
371+
332372
/**
333373
* Inner Fields used for creating XContent and parsing
334374
*

server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodeResponse.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.action.admin.cluster.stats;
3434

35+
import org.opensearch.Version;
3536
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
3637
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
3738
import org.opensearch.action.admin.indices.stats.ShardStats;
@@ -55,6 +56,7 @@ public class ClusterStatsNodeResponse extends BaseNodeResponse {
5556
private final NodeStats nodeStats;
5657
private final ShardStats[] shardsStats;
5758
private ClusterHealthStatus clusterStatus;
59+
private NodeIndexShardStats nodeIndexShardStats;
5860

5961
public ClusterStatsNodeResponse(StreamInput in) throws IOException {
6062
super(in);
@@ -64,7 +66,12 @@ public ClusterStatsNodeResponse(StreamInput in) throws IOException {
6466
}
6567
this.nodeInfo = new NodeInfo(in);
6668
this.nodeStats = new NodeStats(in);
67-
shardsStats = in.readArray(ShardStats::new, ShardStats[]::new);
69+
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
70+
this.shardsStats = in.readOptionalArray(ShardStats::new, ShardStats[]::new);
71+
this.nodeIndexShardStats = in.readOptionalWriteable(NodeIndexShardStats::new);
72+
} else {
73+
this.shardsStats = in.readArray(ShardStats::new, ShardStats[]::new);
74+
}
6875
}
6976

7077
public ClusterStatsNodeResponse(
@@ -81,6 +88,24 @@ public ClusterStatsNodeResponse(
8188
this.clusterStatus = clusterStatus;
8289
}
8390

91+
public ClusterStatsNodeResponse(
92+
DiscoveryNode node,
93+
@Nullable ClusterHealthStatus clusterStatus,
94+
NodeInfo nodeInfo,
95+
NodeStats nodeStats,
96+
ShardStats[] shardsStats,
97+
boolean optimized
98+
) {
99+
super(node);
100+
this.nodeInfo = nodeInfo;
101+
this.nodeStats = nodeStats;
102+
if (optimized) {
103+
this.nodeIndexShardStats = new NodeIndexShardStats(node, shardsStats);
104+
}
105+
this.shardsStats = shardsStats;
106+
this.clusterStatus = clusterStatus;
107+
}
108+
84109
public NodeInfo nodeInfo() {
85110
return this.nodeInfo;
86111
}
@@ -101,6 +126,10 @@ public ShardStats[] shardsStats() {
101126
return this.shardsStats;
102127
}
103128

129+
public NodeIndexShardStats getNodeIndexShardStats() {
130+
return nodeIndexShardStats;
131+
}
132+
104133
public static ClusterStatsNodeResponse readNodeResponse(StreamInput in) throws IOException {
105134
return new ClusterStatsNodeResponse(in);
106135
}
@@ -116,6 +145,16 @@ public void writeTo(StreamOutput out) throws IOException {
116145
}
117146
nodeInfo.writeTo(out);
118147
nodeStats.writeTo(out);
119-
out.writeArray(shardsStats);
148+
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
149+
if (nodeIndexShardStats != null) {
150+
out.writeOptionalArray(null);
151+
out.writeOptionalWriteable(nodeIndexShardStats);
152+
} else {
153+
out.writeOptionalArray(shardsStats);
154+
out.writeOptionalWriteable(null);
155+
}
156+
} else {
157+
out.writeArray(shardsStats);
158+
}
120159
}
121160
}

0 commit comments

Comments
 (0)