Skip to content

Commit 200ad5d

Browse files
authored
Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight (#10352)
* Fix timer race condition in profile rewrite and create weight for concurrent segment search (#10352) Signed-off-by: Ticheng Lin <ticheng@amazon.com> * Refactor and work on the PR comments (#10352) Signed-off-by: Ticheng Lin <ticheng@amazon.com> --------- Signed-off-by: Ticheng Lin <ticheng@amazon.com>
1 parent 781968b commit 200ad5d

File tree

11 files changed

+438
-23
lines changed

11 files changed

+438
-23
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
9292
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
9393
- [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567))
9494
- Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255))
95+
- Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight ([10352](https://github.com/opensearch-project/OpenSearch/pull/10352))
9596

9697
### Dependencies
9798
- Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298))

server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java

Lines changed: 155 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232

3333
package org.opensearch.search.profile.query;
3434

35+
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
36+
3537
import org.apache.lucene.tests.util.English;
3638
import org.opensearch.action.index.IndexRequestBuilder;
3739
import org.opensearch.action.search.MultiSearchResponse;
@@ -40,29 +42,56 @@
4042
import org.opensearch.action.search.SearchType;
4143
import org.opensearch.action.search.ShardSearchFailure;
4244
import org.opensearch.common.settings.Settings;
45+
import org.opensearch.common.util.FeatureFlags;
4346
import org.opensearch.index.query.QueryBuilder;
4447
import org.opensearch.index.query.QueryBuilders;
4548
import org.opensearch.search.SearchHit;
4649
import org.opensearch.search.profile.ProfileResult;
4750
import org.opensearch.search.profile.ProfileShardResult;
4851
import org.opensearch.search.sort.SortOrder;
49-
import org.opensearch.test.OpenSearchIntegTestCase;
52+
import org.opensearch.test.ParameterizedOpenSearchIntegTestCase;
5053

5154
import java.util.Arrays;
55+
import java.util.Collection;
5256
import java.util.HashSet;
5357
import java.util.List;
5458
import java.util.Map;
5559
import java.util.Set;
5660

61+
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
5762
import static org.opensearch.search.profile.query.RandomQueryGenerator.randomQueryBuilder;
5863
import static org.hamcrest.Matchers.emptyOrNullString;
5964
import static org.hamcrest.Matchers.equalTo;
6065
import static org.hamcrest.Matchers.greaterThan;
6166
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
6267
import static org.hamcrest.Matchers.is;
6368
import static org.hamcrest.Matchers.not;
69+
import static org.hamcrest.Matchers.nullValue;
70+
71+
public class QueryProfilerIT extends ParameterizedOpenSearchIntegTestCase {
72+
private final boolean concurrentSearchEnabled;
73+
private static final String MAX_PREFIX = "max_";
74+
private static final String MIN_PREFIX = "min_";
75+
private static final String AVG_PREFIX = "avg_";
76+
private static final String TIMING_TYPE_COUNT_SUFFIX = "_count";
77+
78+
public QueryProfilerIT(Settings settings, boolean concurrentSearchEnabled) {
79+
super(settings);
80+
this.concurrentSearchEnabled = concurrentSearchEnabled;
81+
}
6482

65-
public class QueryProfilerIT extends OpenSearchIntegTestCase {
83+
@ParametersFactory
84+
public static Collection<Object[]> parameters() {
85+
return Arrays.asList(
86+
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build(), false },
87+
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build(), true }
88+
);
89+
}
90+
91+
@Override
92+
protected Settings featureFlagSettings() {
93+
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build();
94+
}
6695

6796
/**
6897
* This test simply checks to make sure nothing crashes. Test indexes 100-150 documents,
@@ -229,6 +258,7 @@ public void testSimpleMatch() throws Exception {
229258
assertEquals(result.getLuceneDescription(), "field1:one");
230259
assertThat(result.getTime(), greaterThan(0L));
231260
assertNotNull(result.getTimeBreakdown());
261+
assertQueryProfileResult(result);
232262
}
233263

234264
CollectorResult result = searchProfiles.getCollectorResult();
@@ -271,6 +301,7 @@ public void testBool() throws Exception {
271301
assertThat(result.getTime(), greaterThan(0L));
272302
assertNotNull(result.getTimeBreakdown());
273303
assertEquals(result.getProfiledChildren().size(), 2);
304+
assertQueryProfileResult(result);
274305

275306
// Check the children
276307
List<ProfileResult> children = result.getProfiledChildren();
@@ -282,12 +313,14 @@ public void testBool() throws Exception {
282313
assertThat(childProfile.getTime(), greaterThan(0L));
283314
assertNotNull(childProfile.getTimeBreakdown());
284315
assertEquals(childProfile.getProfiledChildren().size(), 0);
316+
assertQueryProfileResult(childProfile);
285317

286318
childProfile = children.get(1);
287319
assertEquals(childProfile.getQueryName(), "TermQuery");
288320
assertEquals(childProfile.getLuceneDescription(), "field1:two");
289321
assertThat(childProfile.getTime(), greaterThan(0L));
290322
assertNotNull(childProfile.getTimeBreakdown());
323+
assertQueryProfileResult(childProfile);
291324
}
292325

293326
CollectorResult result = searchProfiles.getCollectorResult();
@@ -330,6 +363,7 @@ public void testEmptyBool() throws Exception {
330363
assertNotNull(result.getLuceneDescription());
331364
assertThat(result.getTime(), greaterThan(0L));
332365
assertNotNull(result.getTimeBreakdown());
366+
assertQueryProfileResult(result);
333367
}
334368

335369
CollectorResult result = searchProfiles.getCollectorResult();
@@ -375,6 +409,7 @@ public void testCollapsingBool() throws Exception {
375409
assertNotNull(result.getLuceneDescription());
376410
assertThat(result.getTime(), greaterThan(0L));
377411
assertNotNull(result.getTimeBreakdown());
412+
assertQueryProfileResult(result);
378413
}
379414

380415
CollectorResult result = searchProfiles.getCollectorResult();
@@ -415,6 +450,90 @@ public void testBoosting() throws Exception {
415450
assertNotNull(result.getLuceneDescription());
416451
assertThat(result.getTime(), greaterThan(0L));
417452
assertNotNull(result.getTimeBreakdown());
453+
assertQueryProfileResult(result);
454+
}
455+
456+
CollectorResult result = searchProfiles.getCollectorResult();
457+
assertThat(result.getName(), is(not(emptyOrNullString())));
458+
assertThat(result.getTime(), greaterThan(0L));
459+
}
460+
}
461+
}
462+
463+
public void testSearchLeafForItsLeavesAndRewriteQuery() throws Exception {
464+
createIndex("test");
465+
ensureGreen();
466+
467+
int numDocs = 122;
468+
IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs];
469+
for (int i = 0; i < numDocs; i++) {
470+
docs[i] = client().prepareIndex("test").setId(String.valueOf(i)).setSource("field1", English.intToEnglish(i), "field2", i);
471+
}
472+
473+
List<String> terms = Arrays.asList("zero", "zero", "one");
474+
475+
indexRandom(true, docs);
476+
477+
refresh();
478+
479+
QueryBuilder q = QueryBuilders.boostingQuery(
480+
QueryBuilders.idsQuery().addIds(String.valueOf(randomInt()), String.valueOf(randomInt())),
481+
QueryBuilders.termsQuery("field1", terms)
482+
).boost(randomFloat()).negativeBoost(randomFloat());
483+
logger.info("Query: {}", q);
484+
485+
SearchResponse resp = client().prepareSearch()
486+
.setQuery(q)
487+
.setTrackTotalHits(true)
488+
.setProfile(true)
489+
.setSearchType(SearchType.QUERY_THEN_FETCH)
490+
.get();
491+
492+
assertNotNull("Profile response element should not be null", resp.getProfileResults());
493+
assertThat("Profile response should not be an empty array", resp.getProfileResults().size(), not(0));
494+
495+
for (Map.Entry<String, ProfileShardResult> shardResult : resp.getProfileResults().entrySet()) {
496+
assertThat(shardResult.getValue().getNetworkTime().getInboundNetworkTime(), greaterThanOrEqualTo(0L));
497+
assertThat(shardResult.getValue().getNetworkTime().getOutboundNetworkTime(), greaterThanOrEqualTo(0L));
498+
for (QueryProfileShardResult searchProfiles : shardResult.getValue().getQueryProfileResults()) {
499+
List<ProfileResult> results = searchProfiles.getQueryResults();
500+
for (ProfileResult result : results) {
501+
assertNotNull(result.getQueryName());
502+
assertNotNull(result.getLuceneDescription());
503+
assertThat(result.getTime(), greaterThan(0L));
504+
Map<String, Long> breakdown = result.getTimeBreakdown();
505+
Long maxSliceTime = result.getMaxSliceTime();
506+
Long minSliceTime = result.getMinSliceTime();
507+
Long avgSliceTime = result.getAvgSliceTime();
508+
if (concurrentSearchEnabled && results.get(0).equals(result)) {
509+
assertNotNull(maxSliceTime);
510+
assertNotNull(minSliceTime);
511+
assertNotNull(avgSliceTime);
512+
assertThat(breakdown.size(), equalTo(66));
513+
for (QueryTimingType queryTimingType : QueryTimingType.values()) {
514+
if (queryTimingType != QueryTimingType.CREATE_WEIGHT) {
515+
String maxTimingType = MAX_PREFIX + queryTimingType;
516+
String minTimingType = MIN_PREFIX + queryTimingType;
517+
String avgTimingType = AVG_PREFIX + queryTimingType;
518+
assertNotNull(breakdown.get(maxTimingType));
519+
assertNotNull(breakdown.get(minTimingType));
520+
assertNotNull(breakdown.get(avgTimingType));
521+
assertNotNull(breakdown.get(maxTimingType + TIMING_TYPE_COUNT_SUFFIX));
522+
assertNotNull(breakdown.get(minTimingType + TIMING_TYPE_COUNT_SUFFIX));
523+
assertNotNull(breakdown.get(avgTimingType + TIMING_TYPE_COUNT_SUFFIX));
524+
}
525+
}
526+
} else if (concurrentSearchEnabled) {
527+
assertThat(maxSliceTime, equalTo(0L));
528+
assertThat(minSliceTime, equalTo(0L));
529+
assertThat(avgSliceTime, equalTo(0L));
530+
assertThat(breakdown.size(), equalTo(27));
531+
} else {
532+
assertThat(maxSliceTime, is(nullValue()));
533+
assertThat(minSliceTime, is(nullValue()));
534+
assertThat(avgSliceTime, is(nullValue()));
535+
assertThat(breakdown.size(), equalTo(27));
536+
}
418537
}
419538

420539
CollectorResult result = searchProfiles.getCollectorResult();
@@ -455,6 +574,7 @@ public void testDisMaxRange() throws Exception {
455574
assertNotNull(result.getLuceneDescription());
456575
assertThat(result.getTime(), greaterThan(0L));
457576
assertNotNull(result.getTimeBreakdown());
577+
assertQueryProfileResult(result);
458578
}
459579

460580
CollectorResult result = searchProfiles.getCollectorResult();
@@ -494,6 +614,7 @@ public void testRange() throws Exception {
494614
assertNotNull(result.getLuceneDescription());
495615
assertThat(result.getTime(), greaterThan(0L));
496616
assertNotNull(result.getTimeBreakdown());
617+
assertQueryProfileResult(result);
497618
}
498619

499620
CollectorResult result = searchProfiles.getCollectorResult();
@@ -547,6 +668,7 @@ public void testPhrase() throws Exception {
547668
assertNotNull(result.getLuceneDescription());
548669
assertThat(result.getTime(), greaterThan(0L));
549670
assertNotNull(result.getTimeBreakdown());
671+
assertQueryProfileResult(result);
550672
}
551673

552674
CollectorResult result = searchProfiles.getCollectorResult();
@@ -579,4 +701,35 @@ public void testNoProfile() throws Exception {
579701
assertThat("Profile response element should be an empty map", resp.getProfileResults().size(), equalTo(0));
580702
}
581703

704+
private void assertQueryProfileResult(ProfileResult result) {
705+
Map<String, Long> breakdown = result.getTimeBreakdown();
706+
Long maxSliceTime = result.getMaxSliceTime();
707+
Long minSliceTime = result.getMinSliceTime();
708+
Long avgSliceTime = result.getAvgSliceTime();
709+
if (concurrentSearchEnabled) {
710+
assertNotNull(maxSliceTime);
711+
assertNotNull(minSliceTime);
712+
assertNotNull(avgSliceTime);
713+
assertThat(breakdown.size(), equalTo(66));
714+
for (QueryTimingType queryTimingType : QueryTimingType.values()) {
715+
if (queryTimingType != QueryTimingType.CREATE_WEIGHT) {
716+
String maxTimingType = MAX_PREFIX + queryTimingType;
717+
String minTimingType = MIN_PREFIX + queryTimingType;
718+
String avgTimingType = AVG_PREFIX + queryTimingType;
719+
assertNotNull(breakdown.get(maxTimingType));
720+
assertNotNull(breakdown.get(minTimingType));
721+
assertNotNull(breakdown.get(avgTimingType));
722+
assertNotNull(breakdown.get(maxTimingType + TIMING_TYPE_COUNT_SUFFIX));
723+
assertNotNull(breakdown.get(minTimingType + TIMING_TYPE_COUNT_SUFFIX));
724+
assertNotNull(breakdown.get(avgTimingType + TIMING_TYPE_COUNT_SUFFIX));
725+
}
726+
}
727+
} else {
728+
assertThat(maxSliceTime, is(nullValue()));
729+
assertThat(minSliceTime, is(nullValue()));
730+
assertThat(avgSliceTime, is(nullValue()));
731+
assertThat(breakdown.size(), equalTo(27));
732+
}
733+
}
734+
582735
}

server/src/main/java/org/opensearch/search/profile/Profilers.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@
3535
import org.opensearch.search.internal.ContextIndexSearcher;
3636
import org.opensearch.search.profile.aggregation.AggregationProfiler;
3737
import org.opensearch.search.profile.aggregation.ConcurrentAggregationProfiler;
38+
import org.opensearch.search.profile.query.ConcurrentQueryProfileTree;
39+
import org.opensearch.search.profile.query.ConcurrentQueryProfiler;
40+
import org.opensearch.search.profile.query.InternalQueryProfileTree;
3841
import org.opensearch.search.profile.query.QueryProfiler;
3942

4043
import java.util.ArrayList;
@@ -64,7 +67,9 @@ public Profilers(ContextIndexSearcher searcher, boolean isConcurrentSegmentSearc
6467

6568
/** Switch to a new profile. */
6669
public QueryProfiler addQueryProfiler() {
67-
QueryProfiler profiler = new QueryProfiler(isConcurrentSegmentSearchEnabled);
70+
QueryProfiler profiler = isConcurrentSegmentSearchEnabled
71+
? new ConcurrentQueryProfiler(new ConcurrentQueryProfileTree())
72+
: new QueryProfiler(new InternalQueryProfileTree());
6873
searcher.setProfiler(profiler);
6974
queryProfilers.add(profiler);
7075
return profiler;

server/src/main/java/org/opensearch/search/profile/Timer.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,18 @@ public class Timer {
5353
private boolean doTiming;
5454
private long timing, count, lastCount, start, earliestTimerStartTime;
5555

56+
public Timer() {
57+
this(0, 0, 0, 0, 0);
58+
}
59+
60+
public Timer(long timing, long count, long lastCount, long start, long earliestTimerStartTime) {
61+
this.timing = timing;
62+
this.count = count;
63+
this.lastCount = lastCount;
64+
this.start = start;
65+
this.earliestTimerStartTime = earliestTimerStartTime;
66+
}
67+
5668
/** pkg-private for testing */
5769
long nanoTime() {
5870
return System.nanoTime();

server/src/main/java/org/opensearch/search/profile/query/AbstractQueryProfileTree.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,11 @@ public void startRewriteTime() {
5454
* startRewriteTime() must be called for a particular context prior to calling
5555
* stopAndAddRewriteTime(), otherwise the elapsed time will be negative and
5656
* nonsensical
57-
*
58-
* @return The elapsed time
5957
*/
60-
public long stopAndAddRewriteTime() {
58+
public void stopAndAddRewriteTime() {
6159
long time = Math.max(1, System.nanoTime() - rewriteScratch);
6260
rewriteTime += time;
6361
rewriteScratch = 0;
64-
return time;
6562
}
6663

6764
public long getRewriteTime() {

server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,29 @@ public Map<String, Long> toBreakdownMap() {
7070
);
7171
final long createWeightTime = topLevelBreakdownMapWithWeightTime.get(QueryTimingType.CREATE_WEIGHT.toString());
7272

73-
if (sliceCollectorsToLeaves.isEmpty() || contexts.isEmpty()) {
73+
if (contexts.isEmpty()) {
7474
// If there are no leaf contexts, then return the default concurrent query level breakdown, which will include the
7575
// create_weight time/count
7676
queryNodeTime = createWeightTime;
7777
maxSliceNodeTime = 0L;
7878
minSliceNodeTime = 0L;
7979
avgSliceNodeTime = 0L;
8080
return buildDefaultQueryBreakdownMap(createWeightTime);
81+
} else if (sliceCollectorsToLeaves.isEmpty()) {
82+
// This will happen when each slice executes search leaf for its leaves and query is rewritten for the leaf being searched. It
83+
// creates a new weight and breakdown map for each rewritten query. This new breakdown map captures the timing information for
84+
// the new rewritten query. The sliceCollectorsToLeaves is empty because this breakdown for rewritten query gets created later
85+
// in search leaf path which doesn't have collector. Also, this is not needed since this breakdown is per leaf and there is no
86+
// concurrency involved. An empty sliceCollectorsToLeaves could also happen in the case of early termination.
87+
AbstractProfileBreakdown<QueryTimingType> breakdown = contexts.values().iterator().next();
88+
queryNodeTime = breakdown.toNodeTime() + createWeightTime;
89+
maxSliceNodeTime = 0L;
90+
minSliceNodeTime = 0L;
91+
avgSliceNodeTime = 0L;
92+
Map<String, Long> queryBreakdownMap = new HashMap<>(breakdown.toBreakdownMap());
93+
queryBreakdownMap.put(QueryTimingType.CREATE_WEIGHT.toString(), createWeightTime);
94+
queryBreakdownMap.put(QueryTimingType.CREATE_WEIGHT + TIMING_TYPE_COUNT_SUFFIX, 1L);
95+
return queryBreakdownMap;
8196
}
8297

8398
// first create the slice level breakdowns
@@ -191,10 +206,12 @@ Map<Collector, Map<String, Long>> buildSliceLevelBreakdown() {
191206
}
192207
// compute sliceMaxEndTime as max of sliceEndTime across all timing types
193208
sliceMaxEndTime = Math.max(sliceMaxEndTime, currentSliceBreakdown.getOrDefault(timingTypeSliceEndTimeKey, Long.MIN_VALUE));
194-
sliceMinStartTime = Math.min(
195-
sliceMinStartTime,
196-
currentSliceBreakdown.getOrDefault(timingTypeSliceStartTimeKey, Long.MAX_VALUE)
197-
);
209+
long currentSliceStartTime = currentSliceBreakdown.getOrDefault(timingTypeSliceStartTimeKey, Long.MAX_VALUE);
210+
if (currentSliceStartTime == 0L) {
211+
// The timer for the current timing type never starts, so we continue here
212+
continue;
213+
}
214+
sliceMinStartTime = Math.min(sliceMinStartTime, currentSliceStartTime);
198215
// compute total time for each timing type at slice level using sliceEndTime and sliceStartTime
199216
currentSliceBreakdown.put(
200217
timingType.toString(),

0 commit comments

Comments
 (0)