Skip to content

Commit fa7454f

Browse files
ticheng-awssohami
authored andcommitted
Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight (opensearch-project#10352)
* Fix timer race condition in profile rewrite and create weight for concurrent segment search (opensearch-project#10352) Signed-off-by: Ticheng Lin <ticheng@amazon.com> * Refactor and work on the PR comments (opensearch-project#10352) Signed-off-by: Ticheng Lin <ticheng@amazon.com> --------- Signed-off-by: Ticheng Lin <ticheng@amazon.com>
1 parent 09a1277 commit fa7454f

File tree

11 files changed

+366
-21
lines changed

11 files changed

+366
-21
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1717
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
1818
- Added cluster setting cluster.restrict.index.replication_type to restrict setting of index setting replication type ([#10866](https://github.com/opensearch-project/OpenSearch/pull/10866))
1919
- Add support for query profiler with concurrent aggregation ([#9248](https://github.com/opensearch-project/OpenSearch/pull/9248))
20+
- Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight ([10352](https://github.com/opensearch-project/OpenSearch/pull/10352))
2021

2122
### Dependencies
2223
- Bumps jetty version to 9.4.52.v20230823 to fix GMS-2023-1857 ([#9822](https://github.com/opensearch-project/OpenSearch/pull/9822))

server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,89 @@ public void testBoosting() throws Exception {
460460
}
461461
}
462462

463+
public void testSearchLeafForItsLeavesAndRewriteQuery() throws Exception {
464+
createIndex("test");
465+
ensureGreen();
466+
467+
int numDocs = 122;
468+
IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs];
469+
for (int i = 0; i < numDocs; i++) {
470+
docs[i] = client().prepareIndex("test").setId(String.valueOf(i)).setSource("field1", English.intToEnglish(i), "field2", i);
471+
}
472+
473+
List<String> terms = Arrays.asList("zero", "zero", "one");
474+
475+
indexRandom(true, docs);
476+
477+
refresh();
478+
479+
QueryBuilder q = QueryBuilders.boostingQuery(
480+
QueryBuilders.idsQuery().addIds(String.valueOf(randomInt()), String.valueOf(randomInt())),
481+
QueryBuilders.termsQuery("field1", terms)
482+
).boost(randomFloat()).negativeBoost(randomFloat());
483+
logger.info("Query: {}", q);
484+
485+
SearchResponse resp = client().prepareSearch()
486+
.setQuery(q)
487+
.setTrackTotalHits(true)
488+
.setProfile(true)
489+
.setSearchType(SearchType.QUERY_THEN_FETCH)
490+
.get();
491+
492+
assertNotNull("Profile response element should not be null", resp.getProfileResults());
493+
assertThat("Profile response should not be an empty array", resp.getProfileResults().size(), not(0));
494+
495+
for (Map.Entry<String, ProfileShardResult> shardResult : resp.getProfileResults().entrySet()) {
496+
assertThat(shardResult.getValue().getNetworkTime().getInboundNetworkTime(), greaterThanOrEqualTo(0L));
497+
assertThat(shardResult.getValue().getNetworkTime().getOutboundNetworkTime(), greaterThanOrEqualTo(0L));
498+
for (QueryProfileShardResult searchProfiles : shardResult.getValue().getQueryProfileResults()) {
499+
List<ProfileResult> results = searchProfiles.getQueryResults();
500+
for (ProfileResult result : results) {
501+
assertNotNull(result.getQueryName());
502+
assertNotNull(result.getLuceneDescription());
503+
assertThat(result.getTime(), greaterThan(0L));
504+
Map<String, Long> breakdown = result.getTimeBreakdown();
505+
Long maxSliceTime = result.getMaxSliceTime();
506+
Long minSliceTime = result.getMinSliceTime();
507+
Long avgSliceTime = result.getAvgSliceTime();
508+
if (concurrentSearchEnabled && results.get(0).equals(result)) {
509+
assertNotNull(maxSliceTime);
510+
assertNotNull(minSliceTime);
511+
assertNotNull(avgSliceTime);
512+
assertThat(breakdown.size(), equalTo(66));
513+
for (QueryTimingType queryTimingType : QueryTimingType.values()) {
514+
if (queryTimingType != QueryTimingType.CREATE_WEIGHT) {
515+
String maxTimingType = MAX_PREFIX + queryTimingType;
516+
String minTimingType = MIN_PREFIX + queryTimingType;
517+
String avgTimingType = AVG_PREFIX + queryTimingType;
518+
assertNotNull(breakdown.get(maxTimingType));
519+
assertNotNull(breakdown.get(minTimingType));
520+
assertNotNull(breakdown.get(avgTimingType));
521+
assertNotNull(breakdown.get(maxTimingType + TIMING_TYPE_COUNT_SUFFIX));
522+
assertNotNull(breakdown.get(minTimingType + TIMING_TYPE_COUNT_SUFFIX));
523+
assertNotNull(breakdown.get(avgTimingType + TIMING_TYPE_COUNT_SUFFIX));
524+
}
525+
}
526+
} else if (concurrentSearchEnabled) {
527+
assertThat(maxSliceTime, equalTo(0L));
528+
assertThat(minSliceTime, equalTo(0L));
529+
assertThat(avgSliceTime, equalTo(0L));
530+
assertThat(breakdown.size(), equalTo(27));
531+
} else {
532+
assertThat(maxSliceTime, is(nullValue()));
533+
assertThat(minSliceTime, is(nullValue()));
534+
assertThat(avgSliceTime, is(nullValue()));
535+
assertThat(breakdown.size(), equalTo(27));
536+
}
537+
}
538+
539+
CollectorResult result = searchProfiles.getCollectorResult();
540+
assertThat(result.getName(), is(not(emptyOrNullString())));
541+
assertThat(result.getTime(), greaterThan(0L));
542+
}
543+
}
544+
}
545+
463546
public void testDisMaxRange() throws Exception {
464547
createIndex("test");
465548
ensureGreen();

server/src/main/java/org/opensearch/search/profile/Profilers.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@
3535
import org.opensearch.search.internal.ContextIndexSearcher;
3636
import org.opensearch.search.profile.aggregation.AggregationProfiler;
3737
import org.opensearch.search.profile.aggregation.ConcurrentAggregationProfiler;
38+
import org.opensearch.search.profile.query.ConcurrentQueryProfileTree;
39+
import org.opensearch.search.profile.query.ConcurrentQueryProfiler;
40+
import org.opensearch.search.profile.query.InternalQueryProfileTree;
3841
import org.opensearch.search.profile.query.QueryProfiler;
3942

4043
import java.util.ArrayList;
@@ -64,7 +67,9 @@ public Profilers(ContextIndexSearcher searcher, boolean isConcurrentSegmentSearc
6467

6568
/** Switch to a new profile. */
6669
public QueryProfiler addQueryProfiler() {
67-
QueryProfiler profiler = new QueryProfiler(isConcurrentSegmentSearchEnabled);
70+
QueryProfiler profiler = isConcurrentSegmentSearchEnabled
71+
? new ConcurrentQueryProfiler(new ConcurrentQueryProfileTree())
72+
: new QueryProfiler(new InternalQueryProfileTree());
6873
searcher.setProfiler(profiler);
6974
queryProfilers.add(profiler);
7075
return profiler;

server/src/main/java/org/opensearch/search/profile/Timer.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,18 @@ public class Timer {
5353
private boolean doTiming;
5454
private long timing, count, lastCount, start, earliestTimerStartTime;
5555

56+
public Timer() {
57+
this(0, 0, 0, 0, 0);
58+
}
59+
60+
public Timer(long timing, long count, long lastCount, long start, long earliestTimerStartTime) {
61+
this.timing = timing;
62+
this.count = count;
63+
this.lastCount = lastCount;
64+
this.start = start;
65+
this.earliestTimerStartTime = earliestTimerStartTime;
66+
}
67+
5668
/** pkg-private for testing */
5769
long nanoTime() {
5870
return System.nanoTime();

server/src/main/java/org/opensearch/search/profile/query/AbstractQueryProfileTree.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,11 @@ public void startRewriteTime() {
5454
* startRewriteTime() must be called for a particular context prior to calling
5555
* stopAndAddRewriteTime(), otherwise the elapsed time will be negative and
5656
* nonsensical
57-
*
58-
* @return The elapsed time
5957
*/
60-
public long stopAndAddRewriteTime() {
58+
public void stopAndAddRewriteTime() {
6159
long time = Math.max(1, System.nanoTime() - rewriteScratch);
6260
rewriteTime += time;
6361
rewriteScratch = 0;
64-
return time;
6562
}
6663

6764
public long getRewriteTime() {

server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,29 @@ public Map<String, Long> toBreakdownMap() {
7070
);
7171
final long createWeightTime = topLevelBreakdownMapWithWeightTime.get(QueryTimingType.CREATE_WEIGHT.toString());
7272

73-
if (sliceCollectorsToLeaves.isEmpty() || contexts.isEmpty()) {
73+
if (contexts.isEmpty()) {
7474
// If there are no leaf contexts, then return the default concurrent query level breakdown, which will include the
7575
// create_weight time/count
7676
queryNodeTime = createWeightTime;
7777
maxSliceNodeTime = 0L;
7878
minSliceNodeTime = 0L;
7979
avgSliceNodeTime = 0L;
8080
return buildDefaultQueryBreakdownMap(createWeightTime);
81+
} else if (sliceCollectorsToLeaves.isEmpty()) {
82+
// This will happen when each slice executes search leaf for its leaves and query is rewritten for the leaf being searched. It
83+
// creates a new weight and breakdown map for each rewritten query. This new breakdown map captures the timing information for
84+
// the new rewritten query. The sliceCollectorsToLeaves is empty because this breakdown for rewritten query gets created later
85+
// in search leaf path which doesn't have collector. Also, this is not needed since this breakdown is per leaf and there is no
86+
// concurrency involved. An empty sliceCollectorsToLeaves could also happen in the case of early termination.
87+
AbstractProfileBreakdown<QueryTimingType> breakdown = contexts.values().iterator().next();
88+
queryNodeTime = breakdown.toNodeTime() + createWeightTime;
89+
maxSliceNodeTime = 0L;
90+
minSliceNodeTime = 0L;
91+
avgSliceNodeTime = 0L;
92+
Map<String, Long> queryBreakdownMap = new HashMap<>(breakdown.toBreakdownMap());
93+
queryBreakdownMap.put(QueryTimingType.CREATE_WEIGHT.toString(), createWeightTime);
94+
queryBreakdownMap.put(QueryTimingType.CREATE_WEIGHT + TIMING_TYPE_COUNT_SUFFIX, 1L);
95+
return queryBreakdownMap;
8196
}
8297

8398
// first create the slice level breakdowns
@@ -191,10 +206,12 @@ Map<Collector, Map<String, Long>> buildSliceLevelBreakdown() {
191206
}
192207
// compute sliceMaxEndTime as max of sliceEndTime across all timing types
193208
sliceMaxEndTime = Math.max(sliceMaxEndTime, currentSliceBreakdown.getOrDefault(timingTypeSliceEndTimeKey, Long.MIN_VALUE));
194-
sliceMinStartTime = Math.min(
195-
sliceMinStartTime,
196-
currentSliceBreakdown.getOrDefault(timingTypeSliceStartTimeKey, Long.MAX_VALUE)
197-
);
209+
long currentSliceStartTime = currentSliceBreakdown.getOrDefault(timingTypeSliceStartTimeKey, Long.MAX_VALUE);
210+
if (currentSliceStartTime == 0L) {
211+
// The timer for the current timing type never starts, so we continue here
212+
continue;
213+
}
214+
sliceMinStartTime = Math.min(sliceMinStartTime, currentSliceStartTime);
198215
// compute total time for each timing type at slice level using sliceEndTime and sliceStartTime
199216
currentSliceBreakdown.put(
200217
timingType.toString(),
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search.profile.query;
10+
11+
import org.apache.lucene.search.Query;
12+
import org.opensearch.search.profile.ContextualProfileBreakdown;
13+
import org.opensearch.search.profile.ProfileResult;
14+
import org.opensearch.search.profile.Timer;
15+
16+
import java.util.ArrayList;
17+
import java.util.Collections;
18+
import java.util.Comparator;
19+
import java.util.LinkedHashMap;
20+
import java.util.LinkedList;
21+
import java.util.List;
22+
import java.util.Map;
23+
import java.util.concurrent.ConcurrentHashMap;
24+
25+
/**
26+
* This class acts as a thread-local storage for profiling a query with concurrent execution
27+
*
28+
* @opensearch.internal
29+
*/
30+
public final class ConcurrentQueryProfiler extends QueryProfiler {
31+
32+
private final Map<Long, ConcurrentQueryProfileTree> threadToProfileTree;
33+
// The LinkedList does not need to be thread safe, as the map associates thread IDs with LinkedList, and only
34+
// one thread will access the LinkedList at a time.
35+
private final Map<Long, LinkedList<Timer>> threadToRewriteTimers;
36+
37+
public ConcurrentQueryProfiler(AbstractQueryProfileTree profileTree) {
38+
super(profileTree);
39+
long threadId = getCurrentThreadId();
40+
// We utilize LinkedHashMap to preserve the insertion order of the profiled queries
41+
threadToProfileTree = Collections.synchronizedMap(new LinkedHashMap<>());
42+
threadToProfileTree.put(threadId, (ConcurrentQueryProfileTree) profileTree);
43+
threadToRewriteTimers = new ConcurrentHashMap<>();
44+
threadToRewriteTimers.put(threadId, new LinkedList<>());
45+
}
46+
47+
@Override
48+
public ContextualProfileBreakdown<QueryTimingType> getQueryBreakdown(Query query) {
49+
ConcurrentQueryProfileTree profileTree = threadToProfileTree.computeIfAbsent(
50+
getCurrentThreadId(),
51+
k -> new ConcurrentQueryProfileTree()
52+
);
53+
return profileTree.getProfileBreakdown(query);
54+
}
55+
56+
/**
57+
* Removes the last (e.g. most recent) element on ConcurrentQueryProfileTree stack.
58+
*/
59+
@Override
60+
public void pollLastElement() {
61+
ConcurrentQueryProfileTree concurrentProfileTree = threadToProfileTree.get(getCurrentThreadId());
62+
if (concurrentProfileTree != null) {
63+
concurrentProfileTree.pollLast();
64+
}
65+
}
66+
67+
/**
68+
* @return a hierarchical representation of the profiled tree
69+
*/
70+
@Override
71+
public List<ProfileResult> getTree() {
72+
List<ProfileResult> profileResults = new ArrayList<>();
73+
for (Map.Entry<Long, ConcurrentQueryProfileTree> profile : threadToProfileTree.entrySet()) {
74+
profileResults.addAll(profile.getValue().getTree());
75+
}
76+
return profileResults;
77+
}
78+
79+
/**
80+
* Begin timing the rewrite phase of a request
81+
*/
82+
@Override
83+
public void startRewriteTime() {
84+
Timer rewriteTimer = new Timer();
85+
threadToRewriteTimers.computeIfAbsent(getCurrentThreadId(), k -> new LinkedList<>()).add(rewriteTimer);
86+
rewriteTimer.start();
87+
}
88+
89+
/**
90+
* Stop recording the current rewrite timer
91+
*/
92+
public void stopAndAddRewriteTime() {
93+
Timer rewriteTimer = threadToRewriteTimers.get(getCurrentThreadId()).getLast();
94+
rewriteTimer.stop();
95+
}
96+
97+
/**
98+
* @return total time taken to rewrite all queries in this concurrent query profiler
99+
*/
100+
@Override
101+
public long getRewriteTime() {
102+
long totalRewriteTime = 0L;
103+
List<Timer> rewriteTimers = new LinkedList<>();
104+
threadToRewriteTimers.values().forEach(rewriteTimers::addAll);
105+
LinkedList<long[]> mergedIntervals = mergeRewriteTimeIntervals(rewriteTimers);
106+
for (long[] interval : mergedIntervals) {
107+
totalRewriteTime += interval[1] - interval[0];
108+
}
109+
return totalRewriteTime;
110+
}
111+
112+
// package private for unit testing
113+
LinkedList<long[]> mergeRewriteTimeIntervals(List<Timer> timers) {
114+
LinkedList<long[]> mergedIntervals = new LinkedList<>();
115+
timers.sort(Comparator.comparingLong(Timer::getEarliestTimerStartTime));
116+
for (Timer timer : timers) {
117+
long startTime = timer.getEarliestTimerStartTime();
118+
long endTime = startTime + timer.getApproximateTiming();
119+
if (mergedIntervals.isEmpty() || mergedIntervals.getLast()[1] < startTime) {
120+
long[] interval = new long[2];
121+
interval[0] = startTime;
122+
interval[1] = endTime;
123+
mergedIntervals.add(interval);
124+
} else {
125+
mergedIntervals.getLast()[1] = Math.max(mergedIntervals.getLast()[1], endTime);
126+
}
127+
}
128+
return mergedIntervals;
129+
}
130+
131+
private long getCurrentThreadId() {
132+
return Thread.currentThread().getId();
133+
}
134+
}

server/src/main/java/org/opensearch/search/profile/query/QueryProfiler.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,15 @@
5151
*
5252
* @opensearch.internal
5353
*/
54-
public final class QueryProfiler extends AbstractProfiler<ContextualProfileBreakdown<QueryTimingType>, Query> {
54+
public class QueryProfiler extends AbstractProfiler<ContextualProfileBreakdown<QueryTimingType>, Query> {
5555

5656
/**
5757
* The root Collector used in the search
5858
*/
5959
private InternalProfileComponent collector;
6060

61-
public QueryProfiler(boolean concurrent) {
62-
super(concurrent ? new ConcurrentQueryProfileTree() : new InternalQueryProfileTree());
61+
public QueryProfiler(AbstractQueryProfileTree profileTree) {
62+
super(profileTree);
6363
}
6464

6565
/** Set the collector that is associated with this profiler. */
@@ -81,14 +81,14 @@ public void startRewriteTime() {
8181
/**
8282
* Stop recording the current rewrite and add it's time to the total tally, returning the
8383
* cumulative time so far.
84-
*
85-
* @return cumulative rewrite time
8684
*/
87-
public long stopAndAddRewriteTime() {
88-
return ((AbstractQueryProfileTree) profileTree).stopAndAddRewriteTime();
85+
public void stopAndAddRewriteTime() {
86+
((AbstractQueryProfileTree) profileTree).stopAndAddRewriteTime();
8987
}
9088

9189
/**
90+
* The rewriting process is complex and hard to display because queries can undergo significant changes.
91+
* Instead of showing intermediate results, we display the cumulative time for the non-concurrent search case.
9292
* @return total time taken to rewrite all queries in this profile
9393
*/
9494
public long getRewriteTime() {

0 commit comments

Comments
 (0)