Skip to content

Commit ac67f99

Browse files
kaushalmahi12wangdongyu.danny
authored and
wangdongyu.danny
committed
Bug/sbp cancellation (opensearch-project#13474)
* change cancellation logic to fix disparity bw trackers and resource duress Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com> * add additional tests for searchBackpressureService and refactor code Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com> * add enumMap instead of list for tracking taskResourceUsageTrackets Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com> * add nodeNotInDuress test for nodeDuressTrackers class Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com> * address comments Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com> * add entry in CHANGELOG Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com> * address comments Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com> * address comments Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com> * remove wildcard import Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com> * streamline imports Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com> * address comments Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com> * add additional test case to test the circuit breaker for SBP logic Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com> * add missing javadoc to resourece type enum Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com> * add javadoc to a method Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com> * fix javadoc warnings Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com> * fix javadoc warnings Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com> --------- Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
1 parent d276d59 commit ac67f99

23 files changed

+915
-335
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3333
### Removed
3434

3535
### Fixed
36+
- Fix bug in SBP cancellation logic ([#13259](https://github.com/opensearch-project/OpenSearch/pull/13474))
3637
- Fix handling of Short and Byte data types in ScriptProcessor ingest pipeline ([#14379](https://github.com/opensearch-project/OpenSearch/issues/14379))
3738
- Switch to iterative version of WKT format parser ([#14086](https://github.com/opensearch-project/OpenSearch/pull/14086))
3839
- Fix the computed max shards of cluster to avoid int overflow ([#14155](https://github.com/opensearch-project/OpenSearch/pull/14155))
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search;
10+
11+
/**
12+
* Enum to hold the resource type
13+
*/
14+
public enum ResourceType {
15+
CPU("cpu"),
16+
JVM("jvm");
17+
18+
private final String name;
19+
20+
ResourceType(String name) {
21+
this.name = name;
22+
}
23+
24+
/**
25+
* The string match here is case-sensitive
26+
* @param s name matching the resource type name
27+
* @return a {@link ResourceType}
28+
*/
29+
public static ResourceType fromName(String s) {
30+
for (ResourceType resourceType : values()) {
31+
if (resourceType.getName().equals(s)) {
32+
return resourceType;
33+
}
34+
}
35+
throw new IllegalArgumentException("Unknown resource type: [" + s + "]");
36+
}
37+
38+
private String getName() {
39+
return name;
40+
}
41+
}

server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java

Lines changed: 146 additions & 95 deletions
Large diffs are not rendered by default.

server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
import org.opensearch.search.backpressure.trackers.CpuUsageTracker;
1919
import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker;
2020
import org.opensearch.search.backpressure.trackers.HeapUsageTracker;
21-
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker;
2221
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType;
22+
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker;
2323

2424
import java.io.IOException;
2525
import java.util.Map;

server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
import org.opensearch.search.backpressure.trackers.CpuUsageTracker;
1919
import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker;
2020
import org.opensearch.search.backpressure.trackers.HeapUsageTracker;
21-
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker;
2221
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType;
22+
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker;
2323

2424
import java.io.IOException;
2525
import java.util.Map;

server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.opensearch.core.common.io.stream.StreamInput;
1313
import org.opensearch.core.common.io.stream.StreamOutput;
1414
import org.opensearch.core.xcontent.XContentBuilder;
15+
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker;
1516
import org.opensearch.tasks.Task;
1617
import org.opensearch.tasks.TaskCancellation;
1718

@@ -34,35 +35,37 @@ public class CpuUsageTracker extends TaskResourceUsageTracker {
3435
private final LongSupplier thresholdSupplier;
3536

3637
public CpuUsageTracker(LongSupplier thresholdSupplier) {
38+
this(thresholdSupplier, (task) -> {
39+
long usage = task.getTotalResourceStats().getCpuTimeInNanos();
40+
long threshold = thresholdSupplier.getAsLong();
41+
42+
if (usage < threshold) {
43+
return Optional.empty();
44+
}
45+
46+
return Optional.of(
47+
new TaskCancellation.Reason(
48+
"cpu usage exceeded ["
49+
+ new TimeValue(usage, TimeUnit.NANOSECONDS)
50+
+ " >= "
51+
+ new TimeValue(threshold, TimeUnit.NANOSECONDS)
52+
+ "]",
53+
1 // TODO: fine-tune the cancellation score/weight
54+
)
55+
);
56+
});
57+
}
58+
59+
public CpuUsageTracker(LongSupplier thresholdSupplier, ResourceUsageBreachEvaluator resourceUsageBreachEvaluator) {
3760
this.thresholdSupplier = thresholdSupplier;
61+
this.resourceUsageBreachEvaluator = resourceUsageBreachEvaluator;
3862
}
3963

4064
@Override
4165
public String name() {
4266
return CPU_USAGE_TRACKER.getName();
4367
}
4468

45-
@Override
46-
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
47-
long usage = task.getTotalResourceStats().getCpuTimeInNanos();
48-
long threshold = thresholdSupplier.getAsLong();
49-
50-
if (usage < threshold) {
51-
return Optional.empty();
52-
}
53-
54-
return Optional.of(
55-
new TaskCancellation.Reason(
56-
"cpu usage exceeded ["
57-
+ new TimeValue(usage, TimeUnit.NANOSECONDS)
58-
+ " >= "
59-
+ new TimeValue(threshold, TimeUnit.NANOSECONDS)
60-
+ "]",
61-
1 // TODO: fine-tune the cancellation score/weight
62-
)
63-
);
64-
}
65-
6669
@Override
6770
public TaskResourceUsageTracker.Stats stats(List<? extends Task> activeTasks) {
6871
long currentMax = activeTasks.stream().mapToLong(t -> t.getTotalResourceStats().getCpuTimeInNanos()).max().orElse(0);

server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.opensearch.core.common.io.stream.StreamInput;
1313
import org.opensearch.core.common.io.stream.StreamOutput;
1414
import org.opensearch.core.xcontent.XContentBuilder;
15+
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker;
1516
import org.opensearch.tasks.Task;
1617
import org.opensearch.tasks.TaskCancellation;
1718

@@ -34,36 +35,42 @@ public class ElapsedTimeTracker extends TaskResourceUsageTracker {
3435
private final LongSupplier timeNanosSupplier;
3536

3637
public ElapsedTimeTracker(LongSupplier thresholdSupplier, LongSupplier timeNanosSupplier) {
38+
this(thresholdSupplier, timeNanosSupplier, (Task task) -> {
39+
long usage = timeNanosSupplier.getAsLong() - task.getStartTimeNanos();
40+
long threshold = thresholdSupplier.getAsLong();
41+
42+
if (usage < threshold) {
43+
return Optional.empty();
44+
}
45+
46+
return Optional.of(
47+
new TaskCancellation.Reason(
48+
"elapsed time exceeded ["
49+
+ new TimeValue(usage, TimeUnit.NANOSECONDS)
50+
+ " >= "
51+
+ new TimeValue(threshold, TimeUnit.NANOSECONDS)
52+
+ "]",
53+
1 // TODO: fine-tune the cancellation score/weight
54+
)
55+
);
56+
});
57+
}
58+
59+
public ElapsedTimeTracker(
60+
LongSupplier thresholdSupplier,
61+
LongSupplier timeNanosSupplier,
62+
ResourceUsageBreachEvaluator resourceUsageBreachEvaluator
63+
) {
3764
this.thresholdSupplier = thresholdSupplier;
3865
this.timeNanosSupplier = timeNanosSupplier;
66+
this.resourceUsageBreachEvaluator = resourceUsageBreachEvaluator;
3967
}
4068

4169
@Override
4270
public String name() {
4371
return ELAPSED_TIME_TRACKER.getName();
4472
}
4573

46-
@Override
47-
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
48-
long usage = timeNanosSupplier.getAsLong() - task.getStartTimeNanos();
49-
long threshold = thresholdSupplier.getAsLong();
50-
51-
if (usage < threshold) {
52-
return Optional.empty();
53-
}
54-
55-
return Optional.of(
56-
new TaskCancellation.Reason(
57-
"elapsed time exceeded ["
58-
+ new TimeValue(usage, TimeUnit.NANOSECONDS)
59-
+ " >= "
60-
+ new TimeValue(threshold, TimeUnit.NANOSECONDS)
61-
+ "]",
62-
1 // TODO: fine-tune the cancellation score/weight
63-
)
64-
);
65-
}
66-
6774
@Override
6875
public TaskResourceUsageTracker.Stats stats(List<? extends Task> activeTasks) {
6976
long now = timeNanosSupplier.getAsLong();

server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java

Lines changed: 38 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.opensearch.core.common.unit.ByteSizeValue;
1919
import org.opensearch.core.xcontent.XContentBuilder;
2020
import org.opensearch.monitor.jvm.JvmStats;
21+
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker;
2122
import org.opensearch.tasks.CancellableTask;
2223
import org.opensearch.tasks.Task;
2324
import org.opensearch.tasks.TaskCancellation;
@@ -55,6 +56,43 @@ public HeapUsageTracker(
5556
this.heapPercentThresholdSupplier = heapPercentThresholdSupplier;
5657
this.movingAverageReference = new AtomicReference<>(new MovingAverage(heapMovingAverageWindowSize));
5758
clusterSettings.addSettingsUpdateConsumer(windowSizeSetting, this::updateWindowSize);
59+
setDefaultResourceUsageBreachEvaluator();
60+
}
61+
62+
/**
63+
* Had to refactor this method out of the constructor as we can't pass a lambda which references a member variable in constructor
64+
* error: cannot reference movingAverageReference before supertype constructor has been called
65+
*/
66+
private void setDefaultResourceUsageBreachEvaluator() {
67+
this.resourceUsageBreachEvaluator = (task) -> {
68+
MovingAverage movingAverage = movingAverageReference.get();
69+
70+
// There haven't been enough measurements.
71+
if (movingAverage.isReady() == false) {
72+
return Optional.empty();
73+
}
74+
75+
double currentUsage = task.getTotalResourceStats().getMemoryInBytes();
76+
double averageUsage = movingAverage.getAverage();
77+
double variance = heapVarianceSupplier.getAsDouble();
78+
double allowedUsage = averageUsage * variance;
79+
double threshold = heapPercentThresholdSupplier.getAsDouble() * HEAP_SIZE_BYTES;
80+
81+
if (isHeapTrackingSupported() == false || currentUsage < threshold || currentUsage < allowedUsage) {
82+
return Optional.empty();
83+
}
84+
85+
return Optional.of(
86+
new TaskCancellation.Reason(
87+
"heap usage exceeded ["
88+
+ new ByteSizeValue((long) currentUsage)
89+
+ " >= "
90+
+ new ByteSizeValue((long) allowedUsage)
91+
+ "]",
92+
(int) (currentUsage / averageUsage) // TODO: fine-tune the cancellation score/weight
93+
)
94+
);
95+
};
5896
}
5997

6098
@Override
@@ -67,33 +105,6 @@ public void update(Task task) {
67105
movingAverageReference.get().record(task.getTotalResourceStats().getMemoryInBytes());
68106
}
69107

70-
@Override
71-
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
72-
MovingAverage movingAverage = movingAverageReference.get();
73-
74-
// There haven't been enough measurements.
75-
if (movingAverage.isReady() == false) {
76-
return Optional.empty();
77-
}
78-
79-
double currentUsage = task.getTotalResourceStats().getMemoryInBytes();
80-
double averageUsage = movingAverage.getAverage();
81-
double variance = heapVarianceSupplier.getAsDouble();
82-
double allowedUsage = averageUsage * variance;
83-
double threshold = heapPercentThresholdSupplier.getAsDouble() * HEAP_SIZE_BYTES;
84-
85-
if (isHeapTrackingSupported() == false || currentUsage < threshold || currentUsage < allowedUsage) {
86-
return Optional.empty();
87-
}
88-
89-
return Optional.of(
90-
new TaskCancellation.Reason(
91-
"heap usage exceeded [" + new ByteSizeValue((long) currentUsage) + " >= " + new ByteSizeValue((long) allowedUsage) + "]",
92-
(int) (currentUsage / averageUsage) // TODO: fine-tune the cancellation score/weight
93-
)
94-
);
95-
}
96-
97108
private void updateWindowSize(int heapMovingAverageWindowSize) {
98109
this.movingAverageReference.set(new MovingAverage(heapMovingAverageWindowSize));
99110
}

server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTracker.java

Lines changed: 0 additions & 41 deletions
This file was deleted.

0 commit comments

Comments
 (0)