Skip to content

Commit bc3dffb

Browse files
aabukhalilBukhtawar
authored andcommitted
Fix false positive query timeouts due to using cached time (#3454)
* Fix false positive query timeouts due to using cached time Signed-off-by: Ahmad AbuKhalil <abukhali@amazon.com> * delegate nanoTime call to SearchContext Signed-off-by: Ahmad AbuKhalil <abukhali@amazon.com> * add override to SearchContext getRelativeTimeInMillis to force non cached time Signed-off-by: Ahmad AbuKhalil <abukhali@amazon.com>
1 parent 5b3a164 commit bc3dffb

File tree

5 files changed

+143
-10
lines changed

5 files changed

+143
-10
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.benchmark.time;
10+
11+
import org.openjdk.jmh.annotations.*;
12+
13+
import java.util.concurrent.TimeUnit;
14+
15+
@Fork(3)
16+
@Warmup(iterations = 10)
17+
@Measurement(iterations = 20)
18+
@BenchmarkMode(Mode.AverageTime)
19+
@OutputTimeUnit(TimeUnit.NANOSECONDS)
20+
@State(Scope.Benchmark)
21+
@SuppressWarnings("unused") // invoked by benchmarking framework
22+
public class NanoTimeVsCurrentTimeMillisBenchmark {
23+
private volatile long var = 0;
24+
25+
@Benchmark
26+
public long currentTimeMillis() {
27+
return System.currentTimeMillis();
28+
}
29+
30+
@Benchmark
31+
public long nanoTime() {
32+
return System.nanoTime();
33+
}
34+
35+
/*
36+
* this acts as upper bound of how time is cached in org.opensearch.threadpool.ThreadPool
37+
* */
38+
@Benchmark
39+
public long accessLongVar() {
40+
return var++;
41+
}
42+
}

server/src/main/java/org/opensearch/search/internal/SearchContext.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,10 +388,24 @@ public final boolean hasOnlySuggest() {
388388

389389
/**
390390
* Returns time in milliseconds that can be used for relative time calculations.
391-
* WARN: This is not the epoch time.
391+
* WARN: This is not the epoch time and can be a cached time.
392392
*/
393393
public abstract long getRelativeTimeInMillis();
394394

395+
/**
396+
* Returns time in milliseconds that can be used for relative time calculations. this method will fall back to
397+
* {@link SearchContext#getRelativeTimeInMillis()} (which might be a cached time) if useCache was set to true else it will be just be a
398+
* wrapper of {@link System#nanoTime()} converted to milliseconds.
399+
* @param useCache to allow using cached time if true or forcing calling {@link System#nanoTime()} if false
400+
* @return Returns time in milliseconds that can be used for relative time calculations.
401+
*/
402+
public long getRelativeTimeInMillis(boolean useCache) {
403+
if (useCache) {
404+
return getRelativeTimeInMillis();
405+
}
406+
return TimeValue.nsecToMSec(System.nanoTime());
407+
}
408+
395409
/** Return a view of the additional query collector managers that should be run for this context. */
396410
public abstract Map<Class<?>, CollectorManager<? extends Collector, ReduceableSearchResult>> queryCollectorManagers();
397411

server/src/main/java/org/opensearch/search/query/QueryPhase.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -257,15 +257,7 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q
257257

258258
final Runnable timeoutRunnable;
259259
if (timeoutSet) {
260-
final long startTime = searchContext.getRelativeTimeInMillis();
261-
final long timeout = searchContext.timeout().millis();
262-
final long maxTime = startTime + timeout;
263-
timeoutRunnable = searcher.addQueryCancellation(() -> {
264-
final long time = searchContext.getRelativeTimeInMillis();
265-
if (time > maxTime) {
266-
throw new TimeExceededException();
267-
}
268-
});
260+
timeoutRunnable = searcher.addQueryCancellation(createQueryTimeoutChecker(searchContext));
269261
} else {
270262
timeoutRunnable = null;
271263
}
@@ -309,6 +301,28 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q
309301
}
310302
}
311303

304+
/**
305+
* Create runnable which throws {@link TimeExceededException} when the runnable is called after timeout + runnable creation time
306+
* exceeds currentTime
307+
* @param searchContext to extract timeout from and to get relative time from
308+
* @return the created runnable
309+
*/
310+
static Runnable createQueryTimeoutChecker(final SearchContext searchContext) {
311+
/* for startTime, relative non-cached precise time must be used to prevent false positive timeouts.
312+
* Using cached time for startTime will fail and produce false positive timeouts when maxTime = (startTime + timeout) falls in
313+
* next time cache slot(s) AND time caching lifespan > passed timeout */
314+
final long startTime = searchContext.getRelativeTimeInMillis(false);
315+
final long maxTime = startTime + searchContext.timeout().millis();
316+
return () -> {
317+
/* As long as startTime is non cached time, using cached time here might only produce false negative timeouts within the time
318+
* cache life span which is acceptable */
319+
final long time = searchContext.getRelativeTimeInMillis();
320+
if (time > maxTime) {
321+
throw new TimeExceededException();
322+
}
323+
};
324+
}
325+
312326
private static boolean searchWithCollector(
313327
SearchContext searchContext,
314328
ContextIndexSearcher searcher,

server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,10 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
368368
ParsedQuery parsedQuery = ParsedQuery.parsedMatchAllQuery();
369369
context3.sliceBuilder(null).parsedQuery(parsedQuery).preProcess(false);
370370
assertEquals(context3.query(), context3.buildFilteredQuery(parsedQuery.query()));
371+
// make sure getPreciseRelativeTimeInMillis is same as System.nanoTime()
372+
long timeToleranceInMs = 10;
373+
long currTime = TimeValue.nsecToMSec(System.nanoTime());
374+
assertTrue(Math.abs(context3.getRelativeTimeInMillis(false) - currTime) <= timeToleranceInMs);
371375

372376
when(queryShardContext.getIndexSettings()).thenReturn(indexSettings);
373377
when(queryShardContext.fieldMapper(anyString())).thenReturn(mock(MappedFieldType.class));

server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
import org.apache.lucene.util.FixedBitSet;
8686
import org.opensearch.action.search.SearchShardTask;
8787
import org.opensearch.common.settings.Settings;
88+
import org.opensearch.common.unit.TimeValue;
8889
import org.opensearch.index.mapper.DateFieldMapper;
8990
import org.opensearch.index.mapper.MappedFieldType;
9091
import org.opensearch.index.mapper.MapperService;
@@ -105,6 +106,7 @@
105106
import org.opensearch.search.sort.SortAndFormats;
106107
import org.opensearch.tasks.TaskCancelledException;
107108
import org.opensearch.test.TestSearchContext;
109+
import org.opensearch.threadpool.ThreadPool;
108110

109111
import java.io.IOException;
110112
import java.util.ArrayList;
@@ -117,9 +119,14 @@
117119
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
118120
import static org.hamcrest.Matchers.instanceOf;
119121
import static org.hamcrest.Matchers.lessThanOrEqualTo;
122+
import static org.mockito.ArgumentMatchers.eq;
120123
import static org.mockito.Mockito.mock;
121124
import static org.mockito.Mockito.spy;
122125
import static org.mockito.Mockito.when;
126+
import static org.mockito.Mockito.verify;
127+
import static org.mockito.Mockito.times;
128+
import static org.mockito.Mockito.atLeastOnce;
129+
import static org.mockito.Mockito.verifyNoMoreInteractions;
123130
import static org.opensearch.search.query.TopDocsCollectorContext.hasInfMaxScore;
124131

125132
public class QueryPhaseTests extends IndexShardTestCase {
@@ -1079,6 +1086,58 @@ public void testCancellationDuringPreprocess() throws IOException {
10791086
}
10801087
}
10811088

1089+
public void testQueryTimeoutChecker() throws Exception {
1090+
long timeCacheLifespan = ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.get(Settings.EMPTY).millis();
1091+
long timeTolerance = timeCacheLifespan / 20;
1092+
1093+
// should throw time exceed exception for sure after timeCacheLifespan*2+timeTolerance (next's next cached time is available)
1094+
assertThrows(
1095+
QueryPhase.TimeExceededException.class,
1096+
() -> createTimeoutCheckerThenWaitThenRun(timeCacheLifespan, timeCacheLifespan * 2 + timeTolerance, true, false)
1097+
);
1098+
1099+
// should not throw time exceed exception after timeCacheLifespan+timeTolerance because new cached time - init time < timeout
1100+
createTimeoutCheckerThenWaitThenRun(timeCacheLifespan, timeCacheLifespan + timeTolerance, true, false);
1101+
1102+
// should not throw time exceed exception after timeout < timeCacheLifespan when cached time didn't change
1103+
createTimeoutCheckerThenWaitThenRun(timeCacheLifespan / 2, timeCacheLifespan / 2 + timeTolerance, false, true);
1104+
createTimeoutCheckerThenWaitThenRun(timeCacheLifespan / 4, timeCacheLifespan / 2 + timeTolerance, false, true);
1105+
}
1106+
1107+
private void createTimeoutCheckerThenWaitThenRun(
1108+
long timeout,
1109+
long sleepAfterCreation,
1110+
boolean checkCachedTimeChanged,
1111+
boolean checkCachedTimeHasNotChanged
1112+
) throws Exception {
1113+
long timeCacheLifespan = ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.get(Settings.EMPTY).millis();
1114+
long timeTolerance = timeCacheLifespan / 20;
1115+
long currentTimeDiffWithCachedTime = TimeValue.nsecToMSec(System.nanoTime()) - threadPool.relativeTimeInMillis();
1116+
// need to run this test approximately at the start of cached time window
1117+
long timeToAlignTimeWithCachedTimeOffset = timeCacheLifespan - currentTimeDiffWithCachedTime + timeTolerance;
1118+
Thread.sleep(timeToAlignTimeWithCachedTimeOffset);
1119+
1120+
long initialRelativeCachedTime = threadPool.relativeTimeInMillis();
1121+
SearchContext mockedSearchContext = mock(SearchContext.class);
1122+
when(mockedSearchContext.timeout()).thenReturn(TimeValue.timeValueMillis(timeout));
1123+
when(mockedSearchContext.getRelativeTimeInMillis()).thenAnswer(invocation -> threadPool.relativeTimeInMillis());
1124+
when(mockedSearchContext.getRelativeTimeInMillis(eq(false))).thenCallRealMethod();
1125+
Runnable queryTimeoutChecker = QueryPhase.createQueryTimeoutChecker(mockedSearchContext);
1126+
// make sure next time slot become available
1127+
Thread.sleep(sleepAfterCreation);
1128+
if (checkCachedTimeChanged) {
1129+
assertNotEquals(initialRelativeCachedTime, threadPool.relativeTimeInMillis());
1130+
}
1131+
if (checkCachedTimeHasNotChanged) {
1132+
assertEquals(initialRelativeCachedTime, threadPool.relativeTimeInMillis());
1133+
}
1134+
queryTimeoutChecker.run();
1135+
verify(mockedSearchContext, times(1)).timeout();
1136+
verify(mockedSearchContext, times(1)).getRelativeTimeInMillis(eq(false));
1137+
verify(mockedSearchContext, atLeastOnce()).getRelativeTimeInMillis();
1138+
verifyNoMoreInteractions(mockedSearchContext);
1139+
}
1140+
10821141
private static class TestSearchContextWithRewriteAndCancellation extends TestSearchContext {
10831142

10841143
private TestSearchContextWithRewriteAndCancellation(

0 commit comments

Comments
 (0)