Skip to content

Commit ad5028a

Browse files
committed
Fix rewrite time issue in concurrent path
Signed-off-by: Ticheng Lin <ticheng@amazon.com>
1 parent d5a95b8 commit ad5028a

File tree

5 files changed

+144
-6
lines changed

5 files changed

+144
-6
lines changed

server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@
8686
import java.util.List;
8787
import java.util.Objects;
8888
import java.util.Set;
89+
import java.util.Map;
90+
import java.util.concurrent.ConcurrentHashMap;
8991
import java.util.concurrent.Executor;
9092

9193
/**
@@ -106,6 +108,8 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
106108
private QueryProfiler profiler;
107109
private MutableQueryTimeout cancellable;
108110
private SearchContext searchContext;
111+
private boolean fromConcurrentPath = false;
112+
private Map<Long, Timer> threadToRewriteTimer = new ConcurrentHashMap<>();
109113

110114
public ContextIndexSearcher(
111115
IndexReader reader,
@@ -185,15 +189,26 @@ public void setAggregatedDfs(AggregatedDfs aggregatedDfs) {
185189

186190
@Override
187191
public Query rewrite(Query original) throws IOException {
192+
Timer concurrentPathRewriteTimer = null;
193+
long threadId = Thread.currentThread().getId();
188194
if (profiler != null) {
189-
profiler.startRewriteTime();
195+
if (fromConcurrentPath) {
196+
concurrentPathRewriteTimer = threadToRewriteTimer.computeIfAbsent(threadId, k -> new Timer());
197+
concurrentPathRewriteTimer.start();
198+
} else {
199+
profiler.startRewriteTime();
200+
}
190201
}
191202

192203
try {
193204
return super.rewrite(original);
194205
} finally {
195206
if (profiler != null) {
196-
profiler.stopAndAddRewriteTime();
207+
if (fromConcurrentPath) {
208+
concurrentPathRewriteTimer.stop();
209+
} else {
210+
profiler.stopAndAddRewriteTime();
211+
}
197212
}
198213
}
199214
}
@@ -271,6 +286,11 @@ protected void search(List<LeafReaderContext> leaves, Weight weight, Collector c
271286
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
272287
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
273288
// reader order here.
289+
if (searchContext.shouldUseConcurrentSearch()) {
290+
fromConcurrentPath = true;
291+
threadToRewriteTimer = ((ProfileWeight) weight).getThreadToRewriteTimer();
292+
}
293+
274294
if (searchContext.shouldUseTimeSeriesDescSortOptimization()) {
275295
for (int i = leaves.size() - 1; i >= 0; i--) {
276296
searchLeaf(leaves.get(i), weight, collector);

server/src/main/java/org/opensearch/search/profile/ContextualProfileBreakdown.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,8 @@ public ContextualProfileBreakdown(Class<T> clazz) {
3535
public void associateCollectorToLeaves(Collector collector, LeafReaderContext leaf) {}
3636

3737
public void associateCollectorsToLeaves(Map<Collector, List<LeafReaderContext>> collectorToLeaves) {}
38+
39+
public Map<Long, Timer> getThreadToRewriteTimer() {
40+
return null;
41+
}
3842
}

server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java

Lines changed: 75 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.opensearch.OpenSearchException;
1414
import org.opensearch.search.profile.AbstractProfileBreakdown;
1515
import org.opensearch.search.profile.ContextualProfileBreakdown;
16+
import org.opensearch.search.profile.Timer;
1617

1718
import java.util.ArrayList;
1819
import java.util.Collections;
@@ -42,6 +43,8 @@ public final class ConcurrentQueryProfileBreakdown extends ContextualProfileBrea
4243
// keep track of all breakdown timings per segment. package-private for testing
4344
private final Map<Object, AbstractProfileBreakdown<QueryTimingType>> contexts = new ConcurrentHashMap<>();
4445

46+
private final Map<Long, Timer> threadToRewriteTimer = new ConcurrentHashMap<>();
47+
4548
// represents slice to leaves mapping as for each slice a unique collector instance is created
4649
private final Map<Collector, List<LeafReaderContext>> sliceCollectorsToLeaves = new ConcurrentHashMap<>();
4750

@@ -50,6 +53,10 @@ public ConcurrentQueryProfileBreakdown() {
5053
super(QueryTimingType.class);
5154
}
5255

56+
public Map<Long, Timer> getThreadToRewriteTimer() {
57+
return threadToRewriteTimer;
58+
}
59+
5360
@Override
5461
public AbstractProfileBreakdown<QueryTimingType> context(Object context) {
5562
// See please https://bugs.openjdk.java.net/browse/JDK-8161372
@@ -93,11 +100,11 @@ public Map<String, Long> toBreakdownMap() {
93100
*/
94101
private Map<String, Long> buildDefaultQueryBreakdownMap(long createWeightTime) {
95102
final Map<String, Long> concurrentQueryBreakdownMap = new HashMap<>();
96-
for (QueryTimingType timingType : QueryTimingType.values()) {
103+
for (ConcurrentQueryTimingType timingType : ConcurrentQueryTimingType.values()) {
97104
final String timingTypeKey = timingType.toString();
98105
final String timingTypeCountKey = timingTypeKey + TIMING_TYPE_COUNT_SUFFIX;
99106

100-
if (timingType.equals(QueryTimingType.CREATE_WEIGHT)) {
107+
if (timingType.equals(ConcurrentQueryTimingType.CREATE_WEIGHT)) {
101108
concurrentQueryBreakdownMap.put(timingTypeKey, createWeightTime);
102109
concurrentQueryBreakdownMap.put(timingTypeCountKey, 1L);
103110
continue;
@@ -248,7 +255,7 @@ public Map<String, Long> buildQueryBreakdownMap(
248255
) {
249256
final Map<String, Long> queryBreakdownMap = new HashMap<>();
250257
long queryEndTime = Long.MIN_VALUE;
251-
for (QueryTimingType queryTimingType : QueryTimingType.values()) {
258+
for (ConcurrentQueryTimingType queryTimingType : ConcurrentQueryTimingType.values()) {
252259
final String timingTypeKey = queryTimingType.toString();
253260
final String timingTypeCountKey = timingTypeKey + TIMING_TYPE_COUNT_SUFFIX;
254261
final String sliceEndTimeForTimingType = timingTypeKey + SLICE_END_TIME_SUFFIX;
@@ -266,12 +273,76 @@ public Map<String, Long> buildQueryBreakdownMap(
266273
long queryTimingTypeCount = 0L;
267274

268275
// the create weight time is computed at the query level and is called only once per query
269-
if (queryTimingType == QueryTimingType.CREATE_WEIGHT) {
276+
if (queryTimingType == ConcurrentQueryTimingType.CREATE_WEIGHT) {
270277
queryBreakdownMap.put(timingTypeCountKey, 1L);
271278
queryBreakdownMap.put(timingTypeKey, createWeightTime);
272279
continue;
273280
}
274281

282+
if (queryTimingType == ConcurrentQueryTimingType.REWRITE) {
283+
if (threadToRewriteTimer.isEmpty()) {
284+
// add time related stats
285+
queryBreakdownMap.put(timingTypeKey, 0L);
286+
queryBreakdownMap.put(maxBreakdownTypeTime, 0L);
287+
queryBreakdownMap.put(minBreakdownTypeTime, 0L);
288+
queryBreakdownMap.put(avgBreakdownTypeTime, 0L);
289+
// add count related stats
290+
queryBreakdownMap.put(timingTypeCountKey, 0L);
291+
queryBreakdownMap.put(maxBreakdownTypeCount, 0L);
292+
queryBreakdownMap.put(minBreakdownTypeCount, 0L);
293+
queryBreakdownMap.put(avgBreakdownTypeCount, 0L);
294+
continue;
295+
}
296+
for (Map.Entry<Long, Timer> rewrite : threadToRewriteTimer.entrySet()) {
297+
long sliceRewriteTime = rewrite.getValue().getApproximateTiming();
298+
long sliceRewriteCount = rewrite.getValue().getCount();
299+
long sliceRewriteStartTime = rewrite.getValue().getEarliestTimerStartTime();
300+
// compute max/min/avg rewrite time across slices
301+
queryBreakdownMap.compute(
302+
maxBreakdownTypeTime,
303+
(key, value) -> (value == null) ? sliceRewriteTime : Math.max(sliceRewriteTime, value)
304+
);
305+
queryBreakdownMap.compute(
306+
minBreakdownTypeTime,
307+
(key, value) -> (value == null) ? sliceRewriteTime : Math.min(sliceRewriteTime, value)
308+
);
309+
queryBreakdownMap.compute(
310+
avgBreakdownTypeTime,
311+
(key, value) -> (value == null) ? sliceRewriteTime : sliceRewriteTime + value
312+
);
313+
314+
// compute max/min/avg rewrite count across slices
315+
queryBreakdownMap.compute(
316+
maxBreakdownTypeCount,
317+
(key, value) -> (value == null) ? sliceRewriteCount : Math.max(sliceRewriteCount, value)
318+
);
319+
queryBreakdownMap.compute(
320+
minBreakdownTypeCount,
321+
(key, value) -> (value == null) ? sliceRewriteCount : Math.min(sliceRewriteCount, value)
322+
);
323+
queryBreakdownMap.compute(
324+
avgBreakdownTypeCount,
325+
(key, value) -> (value == null) ? sliceRewriteCount : sliceRewriteCount + value
326+
);
327+
328+
// query start/end time for rewrite is min/max of start/end time across slices for that TimingType
329+
queryTimingTypeEndTime = Math.max(
330+
queryTimingTypeEndTime,
331+
sliceRewriteStartTime + sliceRewriteTime
332+
);
333+
queryTimingTypeStartTime = Math.min(
334+
queryTimingTypeStartTime,
335+
sliceRewriteStartTime
336+
);
337+
queryTimingTypeCount += sliceRewriteCount;
338+
}
339+
queryBreakdownMap.put(timingTypeKey, queryTimingTypeEndTime - queryTimingTypeStartTime);
340+
queryBreakdownMap.put(timingTypeCountKey, queryTimingTypeCount);
341+
queryBreakdownMap.compute(avgBreakdownTypeTime, (key, value) -> (value == null) ? 0L : value / threadToRewriteTimer.size());
342+
queryBreakdownMap.compute(avgBreakdownTypeCount, (key, value) -> (value == null) ? 0L : value / threadToRewriteTimer.size());
343+
continue;
344+
}
345+
275346
// for all other timing types, we will compute min/max/avg/total across slices
276347
for (Map.Entry<Collector, Map<String, Long>> sliceBreakdown : sliceLevelBreakdowns.entrySet()) {
277348
long sliceBreakdownTypeTime = sliceBreakdown.getValue().getOrDefault(timingTypeKey, 0L);
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/*
10+
* Modifications Copyright OpenSearch Contributors. See
11+
* GitHub history for details.
12+
*/
13+
14+
package org.opensearch.search.profile.query;
15+
16+
import java.util.Locale;
17+
18+
/**
19+
* Different profile levels of the query with concurrent execution
20+
*
21+
* @opensearch.internal
22+
*/
23+
public enum ConcurrentQueryTimingType {
24+
REWRITE,
25+
CREATE_WEIGHT,
26+
BUILD_SCORER,
27+
NEXT_DOC,
28+
ADVANCE,
29+
MATCH,
30+
SCORE,
31+
SHALLOW_ADVANCE,
32+
COMPUTE_MAX_SCORE,
33+
SET_MIN_COMPETITIVE_SCORE;
34+
35+
public String toString() {
36+
return name().toLowerCase(Locale.ROOT);
37+
}
38+
}

server/src/main/java/org/opensearch/search/profile/query/ProfileWeight.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.opensearch.search.profile.Timer;
4545

4646
import java.io.IOException;
47+
import java.util.Map;
4748

4849
/**
4950
* Weight wrapper that will compute how much time it takes to build the
@@ -141,4 +142,8 @@ public boolean isCacheable(LeafReaderContext ctx) {
141142
public void associateCollectorToLeaves(LeafReaderContext leaf, Collector collector) {
142143
profile.associateCollectorToLeaves(collector, leaf);
143144
}
145+
146+
public Map<Long, Timer> getThreadToRewriteTimer() {
147+
return profile.getThreadToRewriteTimer();
148+
}
144149
}

0 commit comments

Comments
 (0)