Skip to content

Commit f2cb9f7

Browse files
committed
Coordinator can return partial results after the timeout when allow_partial_search_results is true
Signed-off-by: kkewwei <kewei.11@bytedance.com> Signed-off-by: kkewwei <kkewwei@163.com>
1 parent b1a7743 commit f2cb9f7

File tree

8 files changed

+280
-9
lines changed

8 files changed

+280
-9
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2121
- Support prefix list for remote repository attributes([#16271](https://github.com/opensearch-project/OpenSearch/pull/16271))
2222
- Add new configuration setting `synonym_analyzer`, to the `synonym` and `synonym_graph` filters, enabling the specification of a custom analyzer for reading the synonym file ([#16488](https://github.com/opensearch-project/OpenSearch/pull/16488)).
2323
- Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/))
24+
- Coordinator can return partial results after the timeout when allow_partial_search_results is true ([#16681](https://github.com/opensearch-project/OpenSearch/pull/16681)).
2425

2526
### Dependencies
2627
- Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504))

server/src/main/java/org/opensearch/action/search/SearchRequest.java

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
8484

8585
private static final long DEFAULT_ABSOLUTE_START_MILLIS = -1;
8686

87+
public static final float DEFAULT_QUERY_PHASE_TIMEOUT_PERCENTAGE = 0.8f;
88+
8789
private final String localClusterAlias;
8890
private final long absoluteStartMillis;
8991
private final boolean finalReduce;
@@ -123,10 +125,16 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
123125

124126
private Boolean phaseTook = null;
125127

128+
// it's only been used in coordinator, so we don't need to serialize/deserialize it
129+
private long startTimeMills;
130+
131+
private float queryPhaseTimeoutPercentage = 0.8f;
132+
126133
public SearchRequest() {
127134
this.localClusterAlias = null;
128135
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
129136
this.finalReduce = true;
137+
this.startTimeMills = System.currentTimeMillis();
130138
}
131139

132140
/**
@@ -228,6 +236,8 @@ private SearchRequest(
228236
this.finalReduce = finalReduce;
229237
this.cancelAfterTimeInterval = searchRequest.cancelAfterTimeInterval;
230238
this.phaseTook = searchRequest.phaseTook;
239+
this.startTimeMills = searchRequest.startTimeMills;
240+
this.queryPhaseTimeoutPercentage = searchRequest.queryPhaseTimeoutPercentage;
231241
}
232242

233243
/**
@@ -275,6 +285,7 @@ public SearchRequest(StreamInput in) throws IOException {
275285
if (in.getVersion().onOrAfter(Version.V_2_12_0)) {
276286
phaseTook = in.readOptionalBoolean();
277287
}
288+
startTimeMills = -1;
278289
}
279290

280291
@Override
@@ -347,6 +358,10 @@ public ActionRequestValidationException validate() {
347358
validationException = addValidationError("using [point in time] is not allowed in a scroll context", validationException);
348359
}
349360
}
361+
362+
if (queryPhaseTimeoutPercentage <= 0 || queryPhaseTimeoutPercentage > 1) {
363+
validationException = addValidationError("[query_phase_timeout_percentage] must be in (0, 1]", validationException);
364+
}
350365
return validationException;
351366
}
352367

@@ -711,9 +726,31 @@ public String pipeline() {
711726
return pipeline;
712727
}
713728

729+
public void setQueryPhaseTimeoutPercentage(float queryPhaseTimeoutPercentage) {
730+
if (source.timeout() == null) {
731+
throw new IllegalArgumentException("timeout must be set before setting queryPhaseTimeoutPercentage");
732+
}
733+
if (source.size() == 0) {
734+
this.queryPhaseTimeoutPercentage = 1;
735+
} else {
736+
this.queryPhaseTimeoutPercentage = queryPhaseTimeoutPercentage;
737+
}
738+
}
739+
714740
@Override
715741
public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
716-
return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers, cancelAfterTimeInterval);
742+
return new SearchTask(
743+
id,
744+
type,
745+
action,
746+
this::buildDescription,
747+
parentTaskId,
748+
headers,
749+
cancelAfterTimeInterval,
750+
startTimeMills,
751+
(source != null && source.timeout() != null) ? source.timeout().millis() : -1,
752+
queryPhaseTimeoutPercentage
753+
);
717754
}
718755

719756
public final String buildDescription() {
@@ -765,7 +802,8 @@ public boolean equals(Object o) {
765802
&& ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips
766803
&& Objects.equals(cancelAfterTimeInterval, that.cancelAfterTimeInterval)
767804
&& Objects.equals(pipeline, that.pipeline)
768-
&& Objects.equals(phaseTook, that.phaseTook);
805+
&& Objects.equals(phaseTook, that.phaseTook)
806+
&& Objects.equals(queryPhaseTimeoutPercentage, that.queryPhaseTimeoutPercentage);
769807
}
770808

771809
@Override
@@ -787,7 +825,8 @@ public int hashCode() {
787825
absoluteStartMillis,
788826
ccsMinimizeRoundtrips,
789827
cancelAfterTimeInterval,
790-
phaseTook
828+
phaseTook,
829+
queryPhaseTimeoutPercentage
791830
);
792831
}
793832

@@ -832,6 +871,8 @@ public String toString() {
832871
+ pipeline
833872
+ ", phaseTook="
834873
+ phaseTook
874+
+ ", queryPhaseTimeoutPercentage="
875+
+ queryPhaseTimeoutPercentage
835876
+ "}";
836877
}
837878
}

server/src/main/java/org/opensearch/action/search/SearchTask.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ public class SearchTask extends QueryGroupTask implements SearchBackpressureTask
5353
// generating description in a lazy way since source can be quite big
5454
private final Supplier<String> descriptionSupplier;
5555
private SearchProgressListener progressListener = SearchProgressListener.NOOP;
56+
private final long startTimeMills;
57+
private final long timeoutMills;
58+
private final float queryPhaseTimeoutPercentage;
5659

5760
public SearchTask(
5861
long id,
@@ -62,7 +65,7 @@ public SearchTask(
6265
TaskId parentTaskId,
6366
Map<String, String> headers
6467
) {
65-
this(id, type, action, descriptionSupplier, parentTaskId, headers, NO_TIMEOUT);
68+
this(id, type, action, descriptionSupplier, parentTaskId, headers, NO_TIMEOUT, -1, -1, 0.8f);
6669
}
6770

6871
public SearchTask(
@@ -72,10 +75,17 @@ public SearchTask(
7275
Supplier<String> descriptionSupplier,
7376
TaskId parentTaskId,
7477
Map<String, String> headers,
75-
TimeValue cancelAfterTimeInterval
78+
TimeValue cancelAfterTimeInterval,
79+
long startTimeMills,
80+
long timeoutMills,
81+
float queryPhaseTimeoutPercentage
7682
) {
7783
super(id, type, action, null, parentTaskId, headers, cancelAfterTimeInterval);
7884
this.descriptionSupplier = descriptionSupplier;
85+
this.startTimeMills = startTimeMills;
86+
this.timeoutMills = timeoutMills;
87+
assert queryPhaseTimeoutPercentage > 0 && queryPhaseTimeoutPercentage <= 1;
88+
this.queryPhaseTimeoutPercentage = queryPhaseTimeoutPercentage;
7989
}
8090

8191
@Override
@@ -106,4 +116,16 @@ public final SearchProgressListener getProgressListener() {
106116
public boolean shouldCancelChildrenOnCancellation() {
107117
return true;
108118
}
119+
120+
public long startTimeMills() {
121+
return startTimeMills;
122+
}
123+
124+
public long timeoutMills() {
125+
return timeoutMills;
126+
}
127+
128+
public long queryPhaseTimeout() {
129+
return (long) (timeoutMills * queryPhaseTimeoutPercentage);
130+
}
109131
}

server/src/main/java/org/opensearch/action/search/SearchTransportService.java

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.opensearch.core.common.io.stream.StreamInput;
4545
import org.opensearch.core.common.io.stream.StreamOutput;
4646
import org.opensearch.core.common.io.stream.Writeable;
47+
import org.opensearch.core.tasks.TaskCancelledException;
4748
import org.opensearch.core.transport.TransportResponse;
4849
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
4950
import org.opensearch.search.SearchPhaseResult;
@@ -76,6 +77,7 @@
7677
import java.util.Map;
7778
import java.util.Objects;
7879
import java.util.function.BiFunction;
80+
import java.util.function.Consumer;
7981

8082
/**
8183
* An encapsulation of {@link org.opensearch.search.SearchService} operations exposed through
@@ -167,12 +169,18 @@ public void createPitContext(
167169
SearchTask task,
168170
ActionListener<TransportCreatePitAction.CreateReaderContextResponse> actionListener
169171
) {
172+
173+
TransportRequestOptions options = getTransportRequestOptions(task, actionListener::onFailure, false);
174+
if (options == null) {
175+
return;
176+
}
177+
170178
transportService.sendChildRequest(
171179
connection,
172180
CREATE_READER_CONTEXT_ACTION_NAME,
173181
request,
174182
task,
175-
TransportRequestOptions.EMPTY,
183+
options,
176184
new ActionListenerResponseHandler<>(actionListener, TransportCreatePitAction.CreateReaderContextResponse::new)
177185
);
178186
}
@@ -183,12 +191,18 @@ public void sendCanMatch(
183191
SearchTask task,
184192
final ActionListener<SearchService.CanMatchResponse> listener
185193
) {
194+
195+
TransportRequestOptions options = getTransportRequestOptions(task, listener::onFailure, false);
196+
if (options == null) {
197+
return;
198+
}
199+
186200
transportService.sendChildRequest(
187201
connection,
188202
QUERY_CAN_MATCH_NAME,
189203
request,
190204
task,
191-
TransportRequestOptions.EMPTY,
205+
options,
192206
new ActionListenerResponseHandler<>(listener, SearchService.CanMatchResponse::new)
193207
);
194208
}
@@ -223,11 +237,18 @@ public void sendExecuteDfs(
223237
SearchTask task,
224238
final SearchActionListener<DfsSearchResult> listener
225239
) {
240+
241+
TransportRequestOptions options = getTransportRequestOptions(task, listener::onFailure, true);
242+
if (options == null) {
243+
return;
244+
}
245+
226246
transportService.sendChildRequest(
227247
connection,
228248
DFS_ACTION_NAME,
229249
request,
230250
task,
251+
options,
231252
new ConnectionCountingHandler<>(listener, DfsSearchResult::new, clientConnections, connection.getNode().getId())
232253
);
233254
}
@@ -243,12 +264,18 @@ public void sendExecuteQuery(
243264
final boolean fetchDocuments = request.numberOfShards() == 1;
244265
Writeable.Reader<SearchPhaseResult> reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;
245266

267+
TransportRequestOptions options = getTransportRequestOptions(task, listener::onFailure, true);
268+
if (options == null) {
269+
return;
270+
}
271+
246272
final ActionListener handler = responseWrapper.apply(connection, listener);
247273
transportService.sendChildRequest(
248274
connection,
249275
QUERY_ACTION_NAME,
250276
request,
251277
task,
278+
options,
252279
new ConnectionCountingHandler<>(handler, reader, clientConnections, connection.getNode().getId())
253280
);
254281
}
@@ -259,11 +286,18 @@ public void sendExecuteQuery(
259286
SearchTask task,
260287
final SearchActionListener<QuerySearchResult> listener
261288
) {
289+
290+
TransportRequestOptions options = getTransportRequestOptions(task, listener::onFailure, true);
291+
if (options == null) {
292+
return;
293+
}
294+
262295
transportService.sendChildRequest(
263296
connection,
264297
QUERY_ID_ACTION_NAME,
265298
request,
266299
task,
300+
options,
267301
new ConnectionCountingHandler<>(listener, QuerySearchResult::new, clientConnections, connection.getNode().getId())
268302
);
269303
}
@@ -274,11 +308,18 @@ public void sendExecuteScrollQuery(
274308
SearchTask task,
275309
final SearchActionListener<ScrollQuerySearchResult> listener
276310
) {
311+
312+
TransportRequestOptions options = getTransportRequestOptions(task, listener::onFailure, false);
313+
if (options == null) {
314+
return;
315+
}
316+
277317
transportService.sendChildRequest(
278318
connection,
279319
QUERY_SCROLL_ACTION_NAME,
280320
request,
281321
task,
322+
options,
282323
new ConnectionCountingHandler<>(listener, ScrollQuerySearchResult::new, clientConnections, connection.getNode().getId())
283324
);
284325
}
@@ -323,11 +364,17 @@ private void sendExecuteFetch(
323364
SearchTask task,
324365
final SearchActionListener<FetchSearchResult> listener
325366
) {
367+
TransportRequestOptions options = getTransportRequestOptions(task, listener::onFailure, false);
368+
if (options == null) {
369+
return;
370+
}
371+
326372
transportService.sendChildRequest(
327373
connection,
328374
action,
329375
request,
330376
task,
377+
options,
331378
new ConnectionCountingHandler<>(listener, FetchSearchResult::new, clientConnections, connection.getNode().getId())
332379
);
333380
}
@@ -337,15 +384,42 @@ private void sendExecuteFetch(
337384
*/
338385
void sendExecuteMultiSearch(final MultiSearchRequest request, SearchTask task, final ActionListener<MultiSearchResponse> listener) {
339386
final Transport.Connection connection = transportService.getConnection(transportService.getLocalNode());
387+
388+
TransportRequestOptions options = getTransportRequestOptions(task, listener::onFailure, false);
389+
if (options == null) {
390+
return;
391+
}
392+
340393
transportService.sendChildRequest(
341394
connection,
342395
MultiSearchAction.NAME,
343396
request,
344397
task,
398+
options,
345399
new ConnectionCountingHandler<>(listener, MultiSearchResponse::new, clientConnections, connection.getNode().getId())
346400
);
347401
}
348402

403+
static TransportRequestOptions getTransportRequestOptions(SearchTask task, Consumer<Exception> onFailure, boolean queryPhase) {
404+
if (task != null && task.timeoutMills() > 0) {
405+
long leftTimeMills;
406+
if (queryPhase) {
407+
// it's costly in query phase.
408+
leftTimeMills = task.queryPhaseTimeout() - (System.currentTimeMillis() - task.startTimeMills());
409+
} else {
410+
leftTimeMills = task.timeoutMills() - (System.currentTimeMillis() - task.startTimeMills());
411+
}
412+
if (leftTimeMills <= 0) {
413+
onFailure.accept(new TaskCancelledException("failed to execute fetch phase, timeout exceeded" + leftTimeMills + "ms"));
414+
return null;
415+
} else {
416+
return TransportRequestOptions.builder().withTimeout(leftTimeMills).build();
417+
}
418+
} else {
419+
return TransportRequestOptions.EMPTY;
420+
}
421+
}
422+
349423
public RemoteClusterService getRemoteClusterService() {
350424
return transportService.getRemoteClusterService();
351425
}

server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,12 @@ public static void parseSearchRequest(
224224
}
225225

226226
searchRequest.setCancelAfterTimeInterval(request.paramAsTime("cancel_after_time_interval", null));
227+
228+
if (request.hasParam("query_phase_timeout_percentage")) {
229+
searchRequest.setQueryPhaseTimeoutPercentage(
230+
request.paramAsFloat("query_phase_timeout_percentage", SearchRequest.DEFAULT_QUERY_PHASE_TIMEOUT_PERCENTAGE)
231+
);
232+
}
227233
}
228234

229235
/**

0 commit comments

Comments
 (0)