Skip to content

Commit 7f93574

Browse files
committed
Fix timer race condition in profile rewrite and create weight for concurrent segment search (#10352)
Signed-off-by: Ticheng Lin <ticheng@amazon.com>
1 parent e389a09 commit 7f93574

File tree

8 files changed

+217
-8
lines changed

8 files changed

+217
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
9191
### Added
9292
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
9393
- [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567))
94+
- Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight ([10352](https://github.com/opensearch-project/OpenSearch/pull/10352))
9495

9596
### Dependencies
9697
- Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298))

server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232

3333
package org.opensearch.search.profile.query;
3434

35+
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
36+
3537
import org.apache.lucene.tests.util.English;
3638
import org.opensearch.action.index.IndexRequestBuilder;
3739
import org.opensearch.action.search.MultiSearchResponse;
@@ -40,29 +42,56 @@
4042
import org.opensearch.action.search.SearchType;
4143
import org.opensearch.action.search.ShardSearchFailure;
4244
import org.opensearch.common.settings.Settings;
45+
import org.opensearch.common.util.FeatureFlags;
4346
import org.opensearch.index.query.QueryBuilder;
4447
import org.opensearch.index.query.QueryBuilders;
4548
import org.opensearch.search.SearchHit;
4649
import org.opensearch.search.profile.ProfileResult;
4750
import org.opensearch.search.profile.ProfileShardResult;
4851
import org.opensearch.search.sort.SortOrder;
49-
import org.opensearch.test.OpenSearchIntegTestCase;
52+
import org.opensearch.test.ParameterizedOpenSearchIntegTestCase;
5053

5154
import java.util.Arrays;
55+
import java.util.Collection;
5256
import java.util.HashSet;
5357
import java.util.List;
5458
import java.util.Map;
5559
import java.util.Set;
5660

61+
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
5762
import static org.opensearch.search.profile.query.RandomQueryGenerator.randomQueryBuilder;
5863
import static org.hamcrest.Matchers.emptyOrNullString;
5964
import static org.hamcrest.Matchers.equalTo;
6065
import static org.hamcrest.Matchers.greaterThan;
6166
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
6267
import static org.hamcrest.Matchers.is;
6368
import static org.hamcrest.Matchers.not;
69+
import static org.hamcrest.Matchers.nullValue;
70+
71+
public class QueryProfilerIT extends ParameterizedOpenSearchIntegTestCase {
72+
private final boolean concurrentSearchEnabled;
73+
private static final String MAX_PREFIX = "max_";
74+
private static final String MIN_PREFIX = "min_";
75+
private static final String AVG_PREFIX = "avg_";
76+
private static final String TIMING_TYPE_COUNT_SUFFIX = "_count";
77+
78+
public QueryProfilerIT(Settings settings, boolean concurrentSearchEnabled) {
79+
super(settings);
80+
this.concurrentSearchEnabled = concurrentSearchEnabled;
81+
}
82+
83+
@ParametersFactory
84+
public static Collection<Object[]> parameters() {
85+
return Arrays.asList(
86+
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build(), false },
87+
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build(), true }
88+
);
89+
}
6490

65-
public class QueryProfilerIT extends OpenSearchIntegTestCase {
91+
@Override
92+
protected Settings featureFlagSettings() {
93+
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build();
94+
}
6695

6796
/**
6897
* This test simply checks to make sure nothing crashes. Test indexes 100-150 documents,
@@ -229,6 +258,7 @@ public void testSimpleMatch() throws Exception {
229258
assertEquals(result.getLuceneDescription(), "field1:one");
230259
assertThat(result.getTime(), greaterThan(0L));
231260
assertNotNull(result.getTimeBreakdown());
261+
assertQueryProfileResult(result);
232262
}
233263

234264
CollectorResult result = searchProfiles.getCollectorResult();
@@ -271,6 +301,7 @@ public void testBool() throws Exception {
271301
assertThat(result.getTime(), greaterThan(0L));
272302
assertNotNull(result.getTimeBreakdown());
273303
assertEquals(result.getProfiledChildren().size(), 2);
304+
assertQueryProfileResult(result);
274305

275306
// Check the children
276307
List<ProfileResult> children = result.getProfiledChildren();
@@ -282,12 +313,14 @@ public void testBool() throws Exception {
282313
assertThat(childProfile.getTime(), greaterThan(0L));
283314
assertNotNull(childProfile.getTimeBreakdown());
284315
assertEquals(childProfile.getProfiledChildren().size(), 0);
316+
assertQueryProfileResult(childProfile);
285317

286318
childProfile = children.get(1);
287319
assertEquals(childProfile.getQueryName(), "TermQuery");
288320
assertEquals(childProfile.getLuceneDescription(), "field1:two");
289321
assertThat(childProfile.getTime(), greaterThan(0L));
290322
assertNotNull(childProfile.getTimeBreakdown());
323+
assertQueryProfileResult(childProfile);
291324
}
292325

293326
CollectorResult result = searchProfiles.getCollectorResult();
@@ -330,6 +363,7 @@ public void testEmptyBool() throws Exception {
330363
assertNotNull(result.getLuceneDescription());
331364
assertThat(result.getTime(), greaterThan(0L));
332365
assertNotNull(result.getTimeBreakdown());
366+
assertQueryProfileResult(result);
333367
}
334368

335369
CollectorResult result = searchProfiles.getCollectorResult();
@@ -375,6 +409,7 @@ public void testCollapsingBool() throws Exception {
375409
assertNotNull(result.getLuceneDescription());
376410
assertThat(result.getTime(), greaterThan(0L));
377411
assertNotNull(result.getTimeBreakdown());
412+
assertQueryProfileResult(result);
378413
}
379414

380415
CollectorResult result = searchProfiles.getCollectorResult();
@@ -415,6 +450,7 @@ public void testBoosting() throws Exception {
415450
assertNotNull(result.getLuceneDescription());
416451
assertThat(result.getTime(), greaterThan(0L));
417452
assertNotNull(result.getTimeBreakdown());
453+
assertQueryProfileResult(result);
418454
}
419455

420456
CollectorResult result = searchProfiles.getCollectorResult();
@@ -455,6 +491,7 @@ public void testDisMaxRange() throws Exception {
455491
assertNotNull(result.getLuceneDescription());
456492
assertThat(result.getTime(), greaterThan(0L));
457493
assertNotNull(result.getTimeBreakdown());
494+
assertQueryProfileResult(result);
458495
}
459496

460497
CollectorResult result = searchProfiles.getCollectorResult();
@@ -494,6 +531,7 @@ public void testRange() throws Exception {
494531
assertNotNull(result.getLuceneDescription());
495532
assertThat(result.getTime(), greaterThan(0L));
496533
assertNotNull(result.getTimeBreakdown());
534+
assertQueryProfileResult(result);
497535
}
498536

499537
CollectorResult result = searchProfiles.getCollectorResult();
@@ -547,6 +585,7 @@ public void testPhrase() throws Exception {
547585
assertNotNull(result.getLuceneDescription());
548586
assertThat(result.getTime(), greaterThan(0L));
549587
assertNotNull(result.getTimeBreakdown());
588+
assertQueryProfileResult(result);
550589
}
551590

552591
CollectorResult result = searchProfiles.getCollectorResult();
@@ -579,4 +618,35 @@ public void testNoProfile() throws Exception {
579618
assertThat("Profile response element should be an empty map", resp.getProfileResults().size(), equalTo(0));
580619
}
581620

621+
private void assertQueryProfileResult(ProfileResult result) {
622+
Map<String, Long> breakdown = result.getTimeBreakdown();
623+
Long maxSliceTime = result.getMaxSliceTime();
624+
Long minSliceTime = result.getMinSliceTime();
625+
Long avgSliceTime = result.getAvgSliceTime();
626+
if (concurrentSearchEnabled) {
627+
assertNotNull(maxSliceTime);
628+
assertNotNull(minSliceTime);
629+
assertNotNull(avgSliceTime);
630+
assertThat(breakdown.size(), equalTo(66));
631+
for (QueryTimingType queryTimingType : QueryTimingType.values()) {
632+
if (queryTimingType != QueryTimingType.CREATE_WEIGHT) {
633+
String maxTimingType = MAX_PREFIX + queryTimingType;
634+
String minTimingType = MIN_PREFIX + queryTimingType;
635+
String avgTimingType = AVG_PREFIX + queryTimingType;
636+
assertNotNull(breakdown.get(maxTimingType));
637+
assertNotNull(breakdown.get(minTimingType));
638+
assertNotNull(breakdown.get(avgTimingType));
639+
assertNotNull(breakdown.get(maxTimingType + TIMING_TYPE_COUNT_SUFFIX));
640+
assertNotNull(breakdown.get(minTimingType + TIMING_TYPE_COUNT_SUFFIX));
641+
assertNotNull(breakdown.get(avgTimingType + TIMING_TYPE_COUNT_SUFFIX));
642+
}
643+
}
644+
} else {
645+
assertThat(maxSliceTime, is(nullValue()));
646+
assertThat(minSliceTime, is(nullValue()));
647+
assertThat(avgSliceTime, is(nullValue()));
648+
assertThat(breakdown.size(), equalTo(27));
649+
}
650+
}
651+
582652
}

server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import org.opensearch.search.dfs.AggregatedDfs;
7272
import org.opensearch.search.profile.ContextualProfileBreakdown;
7373
import org.opensearch.search.profile.Timer;
74+
import org.opensearch.search.profile.query.ConcurrentQueryProfileTree;
7475
import org.opensearch.search.profile.query.ProfileWeight;
7576
import org.opensearch.search.profile.query.QueryProfiler;
7677
import org.opensearch.search.profile.query.QueryTimingType;
@@ -106,6 +107,7 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
106107
private QueryProfiler profiler;
107108
private MutableQueryTimeout cancellable;
108109
private SearchContext searchContext;
110+
private boolean fromConcurrentPath = false;
109111

110112
public ContextIndexSearcher(
111113
IndexReader reader,
@@ -185,15 +187,26 @@ public void setAggregatedDfs(AggregatedDfs aggregatedDfs) {
185187

186188
@Override
187189
public Query rewrite(Query original) throws IOException {
190+
Timer concurrentPathRewriteTimer = null;
188191
if (profiler != null) {
189-
profiler.startRewriteTime();
192+
if (fromConcurrentPath) {
193+
concurrentPathRewriteTimer = new Timer();
194+
profiler.getConcurrentPathRewriteTimers().add(concurrentPathRewriteTimer);
195+
concurrentPathRewriteTimer.start();
196+
} else {
197+
profiler.startRewriteTime();
198+
}
190199
}
191200

192201
try {
193202
return super.rewrite(original);
194203
} finally {
195204
if (profiler != null) {
196-
profiler.stopAndAddRewriteTime();
205+
if (fromConcurrentPath) {
206+
concurrentPathRewriteTimer.stop();
207+
} else {
208+
profiler.stopAndAddRewriteTime();
209+
}
197210
}
198211
}
199212
}
@@ -204,7 +217,15 @@ public Weight createWeight(Query query, ScoreMode scoreMode, float boost) throws
204217
// createWeight() is called for each query in the tree, so we tell the queryProfiler
205218
// each invocation so that it can build an internal representation of the query
206219
// tree
207-
ContextualProfileBreakdown<QueryTimingType> profile = profiler.getQueryBreakdown(query);
220+
ContextualProfileBreakdown<QueryTimingType> profile;
221+
if (searchContext.shouldUseConcurrentSearch()) {
222+
long threadId = Thread.currentThread().getId();
223+
ConcurrentQueryProfileTree profileTree = profiler.getThreadToProfileTree()
224+
.computeIfAbsent(threadId, k -> new ConcurrentQueryProfileTree());
225+
profile = profileTree.getProfileBreakdown(query);
226+
} else {
227+
profile = profiler.getQueryBreakdown(query);
228+
}
208229
Timer timer = profile.getTimer(QueryTimingType.CREATE_WEIGHT);
209230
timer.start();
210231
final Weight weight;
@@ -267,6 +288,9 @@ public void search(
267288

268289
@Override
269290
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
291+
if (searchContext.shouldUseConcurrentSearch()) {
292+
fromConcurrentPath = true;
293+
}
270294
// Time series based workload by default traverses segments in desc order i.e. latest to the oldest order.
271295
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
272296
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf

server/src/main/java/org/opensearch/search/profile/AbstractProfiler.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,11 @@
3232

3333
package org.opensearch.search.profile;
3434

35+
import org.opensearch.search.profile.query.ConcurrentQueryProfileTree;
36+
37+
import java.util.ArrayList;
3538
import java.util.List;
39+
import java.util.Map;
3640

3741
/**
3842
* Base class for a profiler
@@ -42,6 +46,7 @@
4246
public class AbstractProfiler<PB extends AbstractProfileBreakdown<?>, E> {
4347

4448
protected final AbstractInternalProfileTree<PB, E> profileTree;
49+
protected Map<Long, ConcurrentQueryProfileTree> threadToProfileTree;
4550

4651
public AbstractProfiler(AbstractInternalProfileTree<PB, E> profileTree) {
4752
this.profileTree = profileTree;
@@ -59,14 +64,27 @@ public PB getQueryBreakdown(E query) {
5964
* Removes the last (e.g. most recent) element on the stack.
6065
*/
6166
public void pollLastElement() {
62-
profileTree.pollLast();
67+
if (threadToProfileTree == null) {
68+
profileTree.pollLast();
69+
} else {
70+
long threadId = Thread.currentThread().getId();
71+
ConcurrentQueryProfileTree concurrentProfileTree = threadToProfileTree.get(threadId);
72+
concurrentProfileTree.pollLast();
73+
}
6374
}
6475

6576
/**
6677
* @return a hierarchical representation of the profiled tree
6778
*/
6879
public List<ProfileResult> getTree() {
69-
return profileTree.getTree();
80+
if (threadToProfileTree == null) {
81+
return profileTree.getTree();
82+
}
83+
List<ProfileResult> profileResults = new ArrayList<>();
84+
for (Map.Entry<Long, ConcurrentQueryProfileTree> profile : threadToProfileTree.entrySet()) {
85+
profileResults.addAll(profile.getValue().getTree());
86+
}
87+
return profileResults;
7088
}
7189

7290
}

server/src/main/java/org/opensearch/search/profile/Timer.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,18 @@ public class Timer {
5353
private boolean doTiming;
5454
private long timing, count, lastCount, start, earliestTimerStartTime;
5555

56+
public Timer() {
57+
this(0, 0, 0, 0, 0);
58+
}
59+
60+
public Timer(long timing, long count, long lastCount, long start, long earliestTimerStartTime) {
61+
this.timing = timing;
62+
this.count = count;
63+
this.lastCount = lastCount;
64+
this.start = start;
65+
this.earliestTimerStartTime = earliestTimerStartTime;
66+
}
67+
5668
/** pkg-private for testing */
5769
long nanoTime() {
5870
return System.nanoTime();

server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileTree.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
import org.apache.lucene.search.Collector;
1313
import org.opensearch.search.profile.ContextualProfileBreakdown;
1414
import org.opensearch.search.profile.ProfileResult;
15+
import org.opensearch.search.profile.Timer;
1516

17+
import java.util.ArrayList;
1618
import java.util.List;
1719
import java.util.Map;
1820

@@ -22,6 +24,7 @@
2224
* @opensearch.internal
2325
*/
2426
public class ConcurrentQueryProfileTree extends AbstractQueryProfileTree {
27+
protected List<Timer> concurrentPathRewriteTimers = new ArrayList<>();
2528

2629
@Override
2730
protected ContextualProfileBreakdown<QueryTimingType> createProfileBreakdown() {
@@ -88,4 +91,11 @@ private void updateCollectorToLeavesForChildBreakdowns(Integer parentToken, Map<
8891
}
8992
}
9093
}
94+
95+
/**
96+
* @return the concurrent path rewrite timer list for this profile tree
97+
*/
98+
public List<Timer> getConcurrentPathRewriteTimers() {
99+
return concurrentPathRewriteTimers;
100+
}
91101
}

0 commit comments

Comments
 (0)