Skip to content

Commit 9acd749

Browse files
bowenlan-amznkaushalmahi12
authored andcommitted
Use circuit breaker in InternalHistogram when adding empty buckets (opensearch-project#14754)
* introduce circuit breaker in InternalHistogram Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * use circuit breaker from reduce context Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * add test Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * revert use_real_memory change in OpenSearchNode Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * add change log Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> --------- Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
1 parent c4164c5 commit 9acd749

File tree

3 files changed

+49
-1
lines changed

3 files changed

+49
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
7979
- Fix bulk upsert ignores the default_pipeline and final_pipeline when auto-created index matches the index template ([#12891](https://github.com/opensearch-project/OpenSearch/pull/12891))
8080
- Fix NPE in ReplicaShardAllocator ([#14385](https://github.com/opensearch-project/OpenSearch/pull/14385))
8181
- Fix constant_keyword field type used when creating index ([#14807](https://github.com/opensearch-project/OpenSearch/pull/14807))
82+
- Use circuit breaker in InternalHistogram when adding empty buckets ([#14754](https://github.com/opensearch-project/OpenSearch/pull/14754))
8283

8384
### Security
8485

server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/InternalHistogram.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,25 +395,28 @@ private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
395395
// fill with empty buckets
396396
for (double key = round(emptyBucketInfo.minBound); key <= emptyBucketInfo.maxBound; key = nextKey(key)) {
397397
iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
398+
reduceContext.consumeBucketsAndMaybeBreak(0);
398399
}
399400
} else {
400401
Bucket first = list.get(iter.nextIndex());
401402
if (Double.isFinite(emptyBucketInfo.minBound)) {
402403
// fill with empty buckets until the first key
403404
for (double key = round(emptyBucketInfo.minBound); key < first.key; key = nextKey(key)) {
404405
iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
406+
reduceContext.consumeBucketsAndMaybeBreak(0);
405407
}
406408
}
407409

408410
// now adding the empty buckets within the actual data,
409-
// e.g. if the data series is [1,2,3,7] there're 3 empty buckets that will be created for 4,5,6
411+
// e.g. if the data series is [1,2,3,7] there are 3 empty buckets that will be created for 4,5,6
410412
Bucket lastBucket = null;
411413
do {
412414
Bucket nextBucket = list.get(iter.nextIndex());
413415
if (lastBucket != null) {
414416
double key = nextKey(lastBucket.key);
415417
while (key < nextBucket.key) {
416418
iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
419+
reduceContext.consumeBucketsAndMaybeBreak(0);
417420
key = nextKey(key);
418421
}
419422
assert key == nextBucket.key || Double.isNaN(nextBucket.key) : "key: " + key + ", nextBucket.key: " + nextBucket.key;
@@ -424,6 +427,7 @@ private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
424427
// finally, adding the empty buckets *after* the actual data (based on the extended_bounds.max requested by the user)
425428
for (double key = nextKey(lastBucket.key); key <= emptyBucketInfo.maxBound; key = nextKey(key)) {
426429
iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
430+
reduceContext.consumeBucketsAndMaybeBreak(0);
427431
}
428432
}
429433
}

server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/InternalHistogramTests.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,15 @@
3333
package org.opensearch.search.aggregations.bucket.histogram;
3434

3535
import org.apache.lucene.tests.util.TestUtil;
36+
import org.opensearch.core.common.breaker.CircuitBreaker;
37+
import org.opensearch.core.common.breaker.CircuitBreakingException;
3638
import org.opensearch.search.DocValueFormat;
3739
import org.opensearch.search.aggregations.BucketOrder;
40+
import org.opensearch.search.aggregations.InternalAggregation;
3841
import org.opensearch.search.aggregations.InternalAggregations;
42+
import org.opensearch.search.aggregations.MultiBucketConsumerService;
3943
import org.opensearch.search.aggregations.ParsedMultiBucketAggregation;
44+
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
4045
import org.opensearch.test.InternalAggregationTestCase;
4146
import org.opensearch.test.InternalMultiBucketAggregationTestCase;
4247

@@ -47,6 +52,8 @@
4752
import java.util.Map;
4853
import java.util.TreeMap;
4954

55+
import org.mockito.Mockito;
56+
5057
public class InternalHistogramTests extends InternalMultiBucketAggregationTestCase<InternalHistogram> {
5158

5259
private boolean keyed;
@@ -123,6 +130,42 @@ public void testHandlesNaN() {
123130
);
124131
}
125132

133+
public void testCircuitBreakerWhenAddEmptyBuckets() {
134+
String name = randomAlphaOfLength(5);
135+
double interval = 1;
136+
double lowerBound = 1;
137+
double upperBound = 1026;
138+
List<InternalHistogram.Bucket> bucket1 = List.of(
139+
new InternalHistogram.Bucket(lowerBound, 1, false, format, InternalAggregations.EMPTY)
140+
);
141+
List<InternalHistogram.Bucket> bucket2 = List.of(
142+
new InternalHistogram.Bucket(upperBound, 1, false, format, InternalAggregations.EMPTY)
143+
);
144+
BucketOrder order = BucketOrder.key(true);
145+
InternalHistogram.EmptyBucketInfo emptyBucketInfo = new InternalHistogram.EmptyBucketInfo(
146+
interval,
147+
0,
148+
lowerBound,
149+
upperBound,
150+
InternalAggregations.EMPTY
151+
);
152+
InternalHistogram histogram1 = new InternalHistogram(name, bucket1, order, 0, emptyBucketInfo, format, false, null);
153+
InternalHistogram histogram2 = new InternalHistogram(name, bucket2, order, 0, emptyBucketInfo, format, false, null);
154+
155+
CircuitBreaker breaker = Mockito.mock(CircuitBreaker.class);
156+
Mockito.when(breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets")).thenThrow(CircuitBreakingException.class);
157+
158+
MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer(0, breaker);
159+
InternalAggregation.ReduceContext reduceContext = InternalAggregation.ReduceContext.forFinalReduction(
160+
null,
161+
null,
162+
bucketConsumer,
163+
PipelineAggregator.PipelineTree.EMPTY
164+
);
165+
expectThrows(CircuitBreakingException.class, () -> histogram1.reduce(List.of(histogram1, histogram2), reduceContext));
166+
Mockito.verify(breaker, Mockito.times(1)).addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
167+
}
168+
126169
@Override
127170
protected void assertReduced(InternalHistogram reduced, List<InternalHistogram> inputs) {
128171
TreeMap<Double, Long> expectedCounts = new TreeMap<>();

0 commit comments

Comments
 (0)