Skip to content

Commit 46c9a21

Browse files
authored
[Search Pipelines] Add stats for search pipelines (opensearch-project#8053)
* [Search Pipelines] Add stats for search pipelines This adds statistics on executions and time spent on search pipeline operations, similar to the stats that are available for ingest pipelines. Signed-off-by: Michael Froh <froh@amazon.com> * Compare parsed JSON structure, not exact JSON string As @lukas-vlcek pointed out, asserting equality with an exact JSON string is sensitive to formatting, which makes the test brittle. Instead, we can parse the expected JSON and compare as Maps. Signed-off-by: Michael Froh <froh@amazon.com> * Refactor to common stats/metrics classes Search pipelines and ingest pipelines had identical functionality for tracking metrics around operations and converting those to immutable "stats" objects. That approach isn't even really specific to pipelines, but can be used to track metrics on any repeated operation, so I moved that common logic to the common.metrics package. Signed-off-by: Michael Froh <froh@amazon.com> * Split pipeline metrics tracking into its own class Thanks @saratvemulapalli for the suggestion! This lets the Pipeline class focus on transforming requests / responses, while the subclass focuses on tracking and managing metrics. Signed-off-by: Michael Froh <froh@amazon.com> --------- Signed-off-by: Michael Froh <froh@amazon.com>
1 parent 9856cb7 commit 46c9a21

33 files changed

+1398
-458
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
8686
- Implement concurrent aggregations support without profile option ([#7514](https://github.com/opensearch-project/OpenSearch/pull/7514))
8787
- Add dynamic index and cluster setting for concurrent segment search ([#7956](https://github.com/opensearch-project/OpenSearch/pull/7956))
8888
- Add descending order search optimization through reverse segment read. ([#7967](https://github.com/opensearch-project/OpenSearch/pull/7967))
89+
- [Search pipelines] Added search pipelines output to node stats ([#8053](https://github.com/opensearch-project/OpenSearch/pull/8053))
8990
- Update components of segrep backpressure to support remote store. ([#8020](https://github.com/opensearch-project/OpenSearch/pull/8020))
9091
- Make remote cluster connection setup in async ([#8038](https://github.com/opensearch-project/OpenSearch/pull/8038))
9192
- Add API to initialize extensions ([#8029]()https://github.com/opensearch-project/OpenSearch/pull/8029)

modules/ingest-common/src/internalClusterTest/java/org/opensearch/ingest/common/IngestRestartIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public void testFailureInConditionalProcessor() {
132132
for (int k = 0; k < nodeCount; k++) {
133133
List<IngestStats.ProcessorStat> stats = r.getNodes().get(k).getIngestStats().getProcessorStats().get(pipelineId);
134134
for (IngestStats.ProcessorStat st : stats) {
135-
assertThat(st.getStats().getIngestCurrent(), greaterThanOrEqualTo(0L));
135+
assertThat(st.getStats().getCurrent(), greaterThanOrEqualTo(0L));
136136
}
137137
}
138138
}

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.opensearch.script.ScriptCacheStats;
6060
import org.opensearch.script.ScriptStats;
6161
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
62+
import org.opensearch.search.pipeline.SearchPipelineStats;
6263
import org.opensearch.tasks.TaskCancellationStats;
6364
import org.opensearch.threadpool.ThreadPoolStats;
6465
import org.opensearch.transport.TransportStats;
@@ -138,6 +139,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
138139
@Nullable
139140
private TaskCancellationStats taskCancellationStats;
140141

142+
@Nullable
143+
private SearchPipelineStats searchPipelineStats;
144+
141145
public NodeStats(StreamInput in) throws IOException {
142146
super(in);
143147
timestamp = in.readVLong();
@@ -189,6 +193,11 @@ public NodeStats(StreamInput in) throws IOException {
189193
} else {
190194
taskCancellationStats = null;
191195
}
196+
if (in.getVersion().onOrAfter(Version.V_3_0_0)) { // TODO Update to 2_9_0 when we backport to 2.x
197+
searchPipelineStats = in.readOptionalWriteable(SearchPipelineStats::new);
198+
} else {
199+
searchPipelineStats = null;
200+
}
192201
}
193202

194203
public NodeStats(
@@ -214,7 +223,8 @@ public NodeStats(
214223
@Nullable ClusterManagerThrottlingStats clusterManagerThrottlingStats,
215224
@Nullable WeightedRoutingStats weightedRoutingStats,
216225
@Nullable FileCacheStats fileCacheStats,
217-
@Nullable TaskCancellationStats taskCancellationStats
226+
@Nullable TaskCancellationStats taskCancellationStats,
227+
@Nullable SearchPipelineStats searchPipelineStats
218228
) {
219229
super(node);
220230
this.timestamp = timestamp;
@@ -239,6 +249,7 @@ public NodeStats(
239249
this.weightedRoutingStats = weightedRoutingStats;
240250
this.fileCacheStats = fileCacheStats;
241251
this.taskCancellationStats = taskCancellationStats;
252+
this.searchPipelineStats = searchPipelineStats;
242253
}
243254

244255
public long getTimestamp() {
@@ -371,6 +382,11 @@ public TaskCancellationStats getTaskCancellationStats() {
371382
return taskCancellationStats;
372383
}
373384

385+
@Nullable
386+
public SearchPipelineStats getSearchPipelineStats() {
387+
return searchPipelineStats;
388+
}
389+
374390
@Override
375391
public void writeTo(StreamOutput out) throws IOException {
376392
super.writeTo(out);
@@ -411,6 +427,9 @@ public void writeTo(StreamOutput out) throws IOException {
411427
if (out.getVersion().onOrAfter(Version.V_2_9_0)) {
412428
out.writeOptionalWriteable(taskCancellationStats);
413429
}
430+
if (out.getVersion().onOrAfter(Version.V_3_0_0)) { // TODO: Update to 2_9_0 once we backport to 2.x
431+
out.writeOptionalWriteable(searchPipelineStats);
432+
}
414433
}
415434

416435
@Override
@@ -498,6 +517,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
498517
if (getTaskCancellationStats() != null) {
499518
getTaskCancellationStats().toXContent(builder, params);
500519
}
520+
if (getSearchPipelineStats() != null) {
521+
getSearchPipelineStats().toXContent(builder, params);
522+
}
501523

502524
return builder;
503525
}

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,8 @@ public enum Metric {
211211
CLUSTER_MANAGER_THROTTLING("cluster_manager_throttling"),
212212
WEIGHTED_ROUTING_STATS("weighted_routing"),
213213
FILE_CACHE_STATS("file_cache"),
214-
TASK_CANCELLATION("task_cancellation");
214+
TASK_CANCELLATION("task_cancellation"),
215+
SEARCH_PIPELINE("search_pipeline");
215216

216217
private String metricName;
217218

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
123123
NodesStatsRequest.Metric.CLUSTER_MANAGER_THROTTLING.containedIn(metrics),
124124
NodesStatsRequest.Metric.WEIGHTED_ROUTING_STATS.containedIn(metrics),
125125
NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics),
126-
NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics)
126+
NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics),
127+
NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics)
127128
);
128129
}
129130

server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodes.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
4141
import org.opensearch.cluster.node.DiscoveryNode;
4242
import org.opensearch.cluster.node.DiscoveryNodeRole;
43+
import org.opensearch.common.metrics.OperationStats;
4344
import org.opensearch.common.network.NetworkModule;
4445
import org.opensearch.common.settings.Settings;
4546
import org.opensearch.common.transport.TransportAddress;
@@ -800,18 +801,18 @@ static class IngestStats implements ToXContentFragment {
800801
pipelineIds.add(processorStats.getKey());
801802
for (org.opensearch.ingest.IngestStats.ProcessorStat stat : processorStats.getValue()) {
802803
stats.compute(stat.getType(), (k, v) -> {
803-
org.opensearch.ingest.IngestStats.Stats nodeIngestStats = stat.getStats();
804+
OperationStats nodeIngestStats = stat.getStats();
804805
if (v == null) {
805806
return new long[] {
806-
nodeIngestStats.getIngestCount(),
807-
nodeIngestStats.getIngestFailedCount(),
808-
nodeIngestStats.getIngestCurrent(),
809-
nodeIngestStats.getIngestTimeInMillis() };
807+
nodeIngestStats.getCount(),
808+
nodeIngestStats.getFailedCount(),
809+
nodeIngestStats.getCurrent(),
810+
nodeIngestStats.getTotalTimeInMillis() };
810811
} else {
811-
v[0] += nodeIngestStats.getIngestCount();
812-
v[1] += nodeIngestStats.getIngestFailedCount();
813-
v[2] += nodeIngestStats.getIngestCurrent();
814-
v[3] += nodeIngestStats.getIngestTimeInMillis();
812+
v[0] += nodeIngestStats.getCount();
813+
v[1] += nodeIngestStats.getFailedCount();
814+
v[2] += nodeIngestStats.getCurrent();
815+
v[3] += nodeIngestStats.getTotalTimeInMillis();
815816
return v;
816817
}
817818
});

server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
167167
false,
168168
false,
169169
false,
170+
false,
170171
false
171172
);
172173
List<ShardStats> shardsStats = new ArrayList<>();

server/src/main/java/org/opensearch/common/metrics/MeanMetric.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ public void inc(long n) {
4949
sum.add(n);
5050
}
5151

52+
public void add(MeanMetric other) {
53+
counter.add(other.counter.sum());
54+
sum.add(other.sum.sum());
55+
}
56+
5257
public void dec(long n) {
5358
counter.decrement();
5459
sum.add(-n);
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.metrics;
10+
11+
import java.util.concurrent.atomic.AtomicLong;
12+
13+
/**
14+
* Mutable tracker of a repeated operation.
15+
*
16+
* @opensearch.internal
17+
*/
18+
public class OperationMetrics {
19+
/**
20+
* The mean time it takes to complete the measured item.
21+
*/
22+
private final MeanMetric time = new MeanMetric();
23+
/**
24+
* The current count of things being measured.
25+
* Useful when aggregating multiple metrics to see how many things are in flight.
26+
*/
27+
private final AtomicLong current = new AtomicLong();
28+
/**
29+
* The non-decreasing count of failures
30+
*/
31+
private final CounterMetric failed = new CounterMetric();
32+
33+
/**
34+
* Invoked before the given operation begins.
35+
*/
36+
public void before() {
37+
current.incrementAndGet();
38+
}
39+
40+
/**
41+
* Invoked upon completion (success or failure) of the given operation
42+
* @param currentTime elapsed time of the operation
43+
*/
44+
public void after(long currentTime) {
45+
current.decrementAndGet();
46+
time.inc(currentTime);
47+
}
48+
49+
/**
50+
* Invoked upon failure of the operation.
51+
*/
52+
public void failed() {
53+
failed.inc();
54+
}
55+
56+
public void add(OperationMetrics other) {
57+
// Don't try copying over current, since in-flight requests will be linked to the existing metrics instance.
58+
failed.inc(other.failed.count());
59+
time.add(other.time);
60+
}
61+
62+
/**
63+
* @return an immutable snapshot of the current metric values.
64+
*/
65+
public OperationStats createStats() {
66+
return new OperationStats(time.count(), time.sum(), current.get(), failed.count());
67+
}
68+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.metrics;
10+
11+
import org.opensearch.common.io.stream.StreamInput;
12+
import org.opensearch.common.io.stream.StreamOutput;
13+
import org.opensearch.common.io.stream.Writeable;
14+
import org.opensearch.common.unit.TimeValue;
15+
import org.opensearch.core.xcontent.ToXContentFragment;
16+
import org.opensearch.core.xcontent.XContentBuilder;
17+
18+
import java.io.IOException;
19+
import java.util.Objects;
20+
import java.util.concurrent.TimeUnit;
21+
22+
/**
23+
* An immutable representation of a {@link OperationMetrics}
24+
*/
25+
public class OperationStats implements Writeable, ToXContentFragment {
26+
private final long count;
27+
private final long totalTimeInMillis;
28+
private final long current;
29+
private final long failedCount;
30+
31+
public OperationStats(long count, long totalTimeInMillis, long current, long failedCount) {
32+
this.count = count;
33+
this.totalTimeInMillis = totalTimeInMillis;
34+
this.current = current;
35+
this.failedCount = failedCount;
36+
}
37+
38+
/**
39+
* Read from a stream.
40+
*/
41+
public OperationStats(StreamInput in) throws IOException {
42+
count = in.readVLong();
43+
totalTimeInMillis = in.readVLong();
44+
current = in.readVLong();
45+
failedCount = in.readVLong();
46+
}
47+
48+
@Override
49+
public void writeTo(StreamOutput out) throws IOException {
50+
out.writeVLong(count);
51+
out.writeVLong(totalTimeInMillis);
52+
out.writeVLong(current);
53+
out.writeVLong(failedCount);
54+
}
55+
56+
/**
57+
* @return The total number of executed operations.
58+
*/
59+
public long getCount() {
60+
return count;
61+
}
62+
63+
/**
64+
* @return The total time spent of in millis.
65+
*/
66+
public long getTotalTimeInMillis() {
67+
return totalTimeInMillis;
68+
}
69+
70+
/**
71+
* @return The total number of operations currently executing.
72+
*/
73+
public long getCurrent() {
74+
return current;
75+
}
76+
77+
/**
78+
* @return The total number of operations that have failed.
79+
*/
80+
public long getFailedCount() {
81+
return failedCount;
82+
}
83+
84+
@Override
85+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
86+
return builder.field("count", count)
87+
.humanReadableField("time_in_millis", "time", new TimeValue(totalTimeInMillis, TimeUnit.MILLISECONDS))
88+
.field("current", current)
89+
.field("failed", failedCount);
90+
}
91+
92+
@Override
93+
public boolean equals(Object o) {
94+
if (this == o) return true;
95+
if (o == null || getClass() != o.getClass()) return false;
96+
OperationStats that = (OperationStats) o;
97+
return Objects.equals(count, that.count)
98+
&& Objects.equals(totalTimeInMillis, that.totalTimeInMillis)
99+
&& Objects.equals(failedCount, that.failedCount)
100+
&& Objects.equals(current, that.current);
101+
}
102+
103+
@Override
104+
public int hashCode() {
105+
return Objects.hash(count, totalTimeInMillis, failedCount, current);
106+
}
107+
}

0 commit comments

Comments
 (0)