Skip to content

Commit 5b2919f

Browse files
committed
Refactor and work on the PR comments (#10352)
Signed-off-by: Ticheng Lin <ticheng@amazon.com>
1 parent 523fb7e commit 5b2919f

File tree

12 files changed

+357
-151
lines changed

12 files changed

+357
-151
lines changed

server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,89 @@ public void testBoosting() throws Exception {
460460
}
461461
}
462462

463+
public void testSearchLeafForItsLeavesAndRewriteQuery() throws Exception {
464+
createIndex("test");
465+
ensureGreen();
466+
467+
int numDocs = 122;
468+
IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs];
469+
for (int i = 0; i < numDocs; i++) {
470+
docs[i] = client().prepareIndex("test").setId(String.valueOf(i)).setSource("field1", English.intToEnglish(i), "field2", i);
471+
}
472+
473+
List<String> terms = Arrays.asList("zero", "zero", "one");
474+
475+
indexRandom(true, docs);
476+
477+
refresh();
478+
479+
QueryBuilder q = QueryBuilders.boostingQuery(
480+
QueryBuilders.idsQuery().addIds(String.valueOf(randomInt()), String.valueOf(randomInt())),
481+
QueryBuilders.termsQuery("field1", terms)
482+
).boost(randomFloat()).negativeBoost(randomFloat());
483+
logger.info("Query: {}", q);
484+
485+
SearchResponse resp = client().prepareSearch()
486+
.setQuery(q)
487+
.setTrackTotalHits(true)
488+
.setProfile(true)
489+
.setSearchType(SearchType.QUERY_THEN_FETCH)
490+
.get();
491+
492+
assertNotNull("Profile response element should not be null", resp.getProfileResults());
493+
assertThat("Profile response should not be an empty array", resp.getProfileResults().size(), not(0));
494+
495+
for (Map.Entry<String, ProfileShardResult> shardResult : resp.getProfileResults().entrySet()) {
496+
assertThat(shardResult.getValue().getNetworkTime().getInboundNetworkTime(), greaterThanOrEqualTo(0L));
497+
assertThat(shardResult.getValue().getNetworkTime().getOutboundNetworkTime(), greaterThanOrEqualTo(0L));
498+
for (QueryProfileShardResult searchProfiles : shardResult.getValue().getQueryProfileResults()) {
499+
List<ProfileResult> results = searchProfiles.getQueryResults();
500+
for (ProfileResult result : results) {
501+
assertNotNull(result.getQueryName());
502+
assertNotNull(result.getLuceneDescription());
503+
assertThat(result.getTime(), greaterThan(0L));
504+
Map<String, Long> breakdown = result.getTimeBreakdown();
505+
Long maxSliceTime = result.getMaxSliceTime();
506+
Long minSliceTime = result.getMinSliceTime();
507+
Long avgSliceTime = result.getAvgSliceTime();
508+
if (concurrentSearchEnabled && results.get(0).equals(result)) {
509+
assertNotNull(maxSliceTime);
510+
assertNotNull(minSliceTime);
511+
assertNotNull(avgSliceTime);
512+
assertThat(breakdown.size(), equalTo(66));
513+
for (QueryTimingType queryTimingType : QueryTimingType.values()) {
514+
if (queryTimingType != QueryTimingType.CREATE_WEIGHT) {
515+
String maxTimingType = MAX_PREFIX + queryTimingType;
516+
String minTimingType = MIN_PREFIX + queryTimingType;
517+
String avgTimingType = AVG_PREFIX + queryTimingType;
518+
assertNotNull(breakdown.get(maxTimingType));
519+
assertNotNull(breakdown.get(minTimingType));
520+
assertNotNull(breakdown.get(avgTimingType));
521+
assertNotNull(breakdown.get(maxTimingType + TIMING_TYPE_COUNT_SUFFIX));
522+
assertNotNull(breakdown.get(minTimingType + TIMING_TYPE_COUNT_SUFFIX));
523+
assertNotNull(breakdown.get(avgTimingType + TIMING_TYPE_COUNT_SUFFIX));
524+
}
525+
}
526+
} else if (concurrentSearchEnabled) {
527+
assertThat(maxSliceTime, equalTo(0L));
528+
assertThat(minSliceTime, equalTo(0L));
529+
assertThat(avgSliceTime, equalTo(0L));
530+
assertThat(breakdown.size(), equalTo(27));
531+
} else {
532+
assertThat(maxSliceTime, is(nullValue()));
533+
assertThat(minSliceTime, is(nullValue()));
534+
assertThat(avgSliceTime, is(nullValue()));
535+
assertThat(breakdown.size(), equalTo(27));
536+
}
537+
}
538+
539+
CollectorResult result = searchProfiles.getCollectorResult();
540+
assertThat(result.getName(), is(not(emptyOrNullString())));
541+
assertThat(result.getTime(), greaterThan(0L));
542+
}
543+
}
544+
}
545+
463546
public void testDisMaxRange() throws Exception {
464547
createIndex("test");
465548
ensureGreen();

server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@
7171
import org.opensearch.search.dfs.AggregatedDfs;
7272
import org.opensearch.search.profile.ContextualProfileBreakdown;
7373
import org.opensearch.search.profile.Timer;
74-
import org.opensearch.search.profile.query.ConcurrentQueryProfileTree;
7574
import org.opensearch.search.profile.query.ProfileWeight;
7675
import org.opensearch.search.profile.query.QueryProfiler;
7776
import org.opensearch.search.profile.query.QueryTimingType;
@@ -107,7 +106,6 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
107106
private QueryProfiler profiler;
108107
private MutableQueryTimeout cancellable;
109108
private SearchContext searchContext;
110-
private boolean fromConcurrentPath = false;
111109

112110
public ContextIndexSearcher(
113111
IndexReader reader,
@@ -187,26 +185,15 @@ public void setAggregatedDfs(AggregatedDfs aggregatedDfs) {
187185

188186
@Override
189187
public Query rewrite(Query original) throws IOException {
190-
Timer concurrentPathRewriteTimer = null;
191188
if (profiler != null) {
192-
if (fromConcurrentPath) {
193-
concurrentPathRewriteTimer = new Timer();
194-
profiler.getConcurrentPathRewriteTimers().add(concurrentPathRewriteTimer);
195-
concurrentPathRewriteTimer.start();
196-
} else {
197-
profiler.startRewriteTime();
198-
}
189+
profiler.startRewriteTime();
199190
}
200191

201192
try {
202193
return super.rewrite(original);
203194
} finally {
204195
if (profiler != null) {
205-
if (fromConcurrentPath) {
206-
concurrentPathRewriteTimer.stop();
207-
} else {
208-
profiler.stopAndAddRewriteTime();
209-
}
196+
profiler.stopAndAddRewriteTime();
210197
}
211198
}
212199
}
@@ -217,15 +204,7 @@ public Weight createWeight(Query query, ScoreMode scoreMode, float boost) throws
217204
// createWeight() is called for each query in the tree, so we tell the queryProfiler
218205
// each invocation so that it can build an internal representation of the query
219206
// tree
220-
ContextualProfileBreakdown<QueryTimingType> profile;
221-
if (searchContext.shouldUseConcurrentSearch()) {
222-
long threadId = Thread.currentThread().getId();
223-
ConcurrentQueryProfileTree profileTree = profiler.getThreadToProfileTree()
224-
.computeIfAbsent(threadId, k -> new ConcurrentQueryProfileTree());
225-
profile = profileTree.getProfileBreakdown(query);
226-
} else {
227-
profile = profiler.getQueryBreakdown(query);
228-
}
207+
ContextualProfileBreakdown<QueryTimingType> profile = profiler.getQueryBreakdown(query);
229208
Timer timer = profile.getTimer(QueryTimingType.CREATE_WEIGHT);
230209
timer.start();
231210
final Weight weight;
@@ -288,9 +267,6 @@ public void search(
288267

289268
@Override
290269
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
291-
if (searchContext.shouldUseConcurrentSearch()) {
292-
fromConcurrentPath = true;
293-
}
294270
// Time series based workload by default traverses segments in desc order i.e. latest to the oldest order.
295271
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
296272
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf

server/src/main/java/org/opensearch/search/profile/AbstractProfiler.java

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,7 @@
3232

3333
package org.opensearch.search.profile;
3434

35-
import org.opensearch.search.profile.query.ConcurrentQueryProfileTree;
36-
37-
import java.util.ArrayList;
3835
import java.util.List;
39-
import java.util.Map;
4036

4137
/**
4238
* Base class for a profiler
@@ -46,7 +42,6 @@
4642
public class AbstractProfiler<PB extends AbstractProfileBreakdown<?>, E> {
4743

4844
protected final AbstractInternalProfileTree<PB, E> profileTree;
49-
protected Map<Long, ConcurrentQueryProfileTree> threadToProfileTree;
5045

5146
public AbstractProfiler(AbstractInternalProfileTree<PB, E> profileTree) {
5247
this.profileTree = profileTree;
@@ -64,27 +59,14 @@ public PB getQueryBreakdown(E query) {
6459
* Removes the last (e.g. most recent) element on the stack.
6560
*/
6661
public void pollLastElement() {
67-
if (threadToProfileTree == null) {
68-
profileTree.pollLast();
69-
} else {
70-
long threadId = Thread.currentThread().getId();
71-
ConcurrentQueryProfileTree concurrentProfileTree = threadToProfileTree.get(threadId);
72-
concurrentProfileTree.pollLast();
73-
}
62+
profileTree.pollLast();
7463
}
7564

7665
/**
7766
* @return a hierarchical representation of the profiled tree
7867
*/
7968
public List<ProfileResult> getTree() {
80-
if (threadToProfileTree == null) {
81-
return profileTree.getTree();
82-
}
83-
List<ProfileResult> profileResults = new ArrayList<>();
84-
for (Map.Entry<Long, ConcurrentQueryProfileTree> profile : threadToProfileTree.entrySet()) {
85-
profileResults.addAll(profile.getValue().getTree());
86-
}
87-
return profileResults;
69+
return profileTree.getTree();
8870
}
8971

9072
}

server/src/main/java/org/opensearch/search/profile/Profilers.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@
3535
import org.opensearch.search.internal.ContextIndexSearcher;
3636
import org.opensearch.search.profile.aggregation.AggregationProfiler;
3737
import org.opensearch.search.profile.aggregation.ConcurrentAggregationProfiler;
38+
import org.opensearch.search.profile.query.ConcurrentQueryProfileTree;
39+
import org.opensearch.search.profile.query.ConcurrentQueryProfiler;
40+
import org.opensearch.search.profile.query.InternalQueryProfileTree;
3841
import org.opensearch.search.profile.query.QueryProfiler;
3942

4043
import java.util.ArrayList;
@@ -64,7 +67,9 @@ public Profilers(ContextIndexSearcher searcher, boolean isConcurrentSegmentSearc
6467

6568
/** Switch to a new profile. */
6669
public QueryProfiler addQueryProfiler() {
67-
QueryProfiler profiler = new QueryProfiler(isConcurrentSegmentSearchEnabled);
70+
QueryProfiler profiler = isConcurrentSegmentSearchEnabled
71+
? new ConcurrentQueryProfiler(new ConcurrentQueryProfileTree())
72+
: new QueryProfiler(new InternalQueryProfileTree());
6873
searcher.setProfiler(profiler);
6974
queryProfilers.add(profiler);
7075
return profiler;

server/src/main/java/org/opensearch/search/profile/query/AbstractQueryProfileTree.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,11 @@ public void startRewriteTime() {
5454
* startRewriteTime() must be called for a particular context prior to calling
5555
* stopAndAddRewriteTime(), otherwise the elapsed time will be negative and
5656
* nonsensical
57-
*
58-
* @return The elapsed time
5957
*/
60-
public long stopAndAddRewriteTime() {
58+
public void stopAndAddRewriteTime() {
6159
long time = Math.max(1, System.nanoTime() - rewriteScratch);
6260
rewriteTime += time;
6361
rewriteScratch = 0;
64-
return time;
6562
}
6663

6764
public long getRewriteTime() {

server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,29 @@ public Map<String, Long> toBreakdownMap() {
7070
);
7171
final long createWeightTime = topLevelBreakdownMapWithWeightTime.get(QueryTimingType.CREATE_WEIGHT.toString());
7272

73-
if (sliceCollectorsToLeaves.isEmpty() || contexts.isEmpty()) {
73+
if (contexts.isEmpty()) {
7474
// If there are no leaf contexts, then return the default concurrent query level breakdown, which will include the
7575
// create_weight time/count
7676
queryNodeTime = createWeightTime;
7777
maxSliceNodeTime = 0L;
7878
minSliceNodeTime = 0L;
7979
avgSliceNodeTime = 0L;
8080
return buildDefaultQueryBreakdownMap(createWeightTime);
81+
} else if (sliceCollectorsToLeaves.isEmpty()) {
82+
// This will happen when each slice executes search leaf for its leaves and query is rewritten for the leaf being searched. It
83+
// creates a new weight and breakdown map for each rewritten query. This new breakdown map captures the timing information for
84+
// the new rewritten query. The sliceCollectorsToLeaves is empty because this breakdown for rewritten query gets created later
85+
// in search leaf path which doesn't have collector. Also, this is not needed since this breakdown is per leaf and there is no
86+
// concurrency involved. An empty sliceCollectorsToLeaves could also happen in the case of early termination.
87+
AbstractProfileBreakdown<QueryTimingType> breakdown = contexts.values().iterator().next();
88+
queryNodeTime = breakdown.toNodeTime() + createWeightTime;
89+
maxSliceNodeTime = 0L;
90+
minSliceNodeTime = 0L;
91+
avgSliceNodeTime = 0L;
92+
Map<String, Long> queryBreakdownMap = new HashMap<>(breakdown.toBreakdownMap());
93+
queryBreakdownMap.put(QueryTimingType.CREATE_WEIGHT.toString(), createWeightTime);
94+
queryBreakdownMap.put(QueryTimingType.CREATE_WEIGHT + TIMING_TYPE_COUNT_SUFFIX, 1L);
95+
return queryBreakdownMap;
8196
}
8297

8398
// first create the slice level breakdowns
@@ -191,10 +206,12 @@ Map<Collector, Map<String, Long>> buildSliceLevelBreakdown() {
191206
}
192207
// compute sliceMaxEndTime as max of sliceEndTime across all timing types
193208
sliceMaxEndTime = Math.max(sliceMaxEndTime, currentSliceBreakdown.getOrDefault(timingTypeSliceEndTimeKey, Long.MIN_VALUE));
194-
sliceMinStartTime = Math.min(
195-
sliceMinStartTime,
196-
currentSliceBreakdown.getOrDefault(timingTypeSliceStartTimeKey, Long.MAX_VALUE)
197-
);
209+
long currentSliceStartTime = currentSliceBreakdown.getOrDefault(timingTypeSliceStartTimeKey, Long.MAX_VALUE);
210+
if (currentSliceStartTime == 0L) {
211+
// The timer for the current timing type never starts, so we continue here
212+
continue;
213+
}
214+
sliceMinStartTime = Math.min(sliceMinStartTime, currentSliceStartTime);
198215
// compute total time for each timing type at slice level using sliceEndTime and sliceStartTime
199216
currentSliceBreakdown.put(
200217
timingType.toString(),

server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileTree.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,7 @@
1212
import org.apache.lucene.search.Collector;
1313
import org.opensearch.search.profile.ContextualProfileBreakdown;
1414
import org.opensearch.search.profile.ProfileResult;
15-
import org.opensearch.search.profile.Timer;
1615

17-
import java.util.ArrayList;
1816
import java.util.List;
1917
import java.util.Map;
2018

@@ -24,7 +22,6 @@
2422
* @opensearch.internal
2523
*/
2624
public class ConcurrentQueryProfileTree extends AbstractQueryProfileTree {
27-
protected List<Timer> concurrentPathRewriteTimers = new ArrayList<>();
2825

2926
@Override
3027
protected ContextualProfileBreakdown<QueryTimingType> createProfileBreakdown() {
@@ -91,11 +88,4 @@ private void updateCollectorToLeavesForChildBreakdowns(Integer parentToken, Map<
9188
}
9289
}
9390
}
94-
95-
/**
96-
* @return the concurrent path rewrite timer list for this profile tree
97-
*/
98-
public List<Timer> getConcurrentPathRewriteTimers() {
99-
return concurrentPathRewriteTimers;
100-
}
10191
}

0 commit comments

Comments
 (0)