Skip to content

Commit 3e56548

Browse files
committed
Coordinator can return partial results after the timeout when allow_partial_search_results is true
Signed-off-by: kkewwei <kewei.11@bytedance.com> Signed-off-by: kkewwei <kkewwei@163.com>
1 parent c82cd2e commit 3e56548

File tree

11 files changed

+179
-9
lines changed

11 files changed

+179
-9
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2121
- Support prefix list for remote repository attributes([#16271](https://github.com/opensearch-project/OpenSearch/pull/16271))
2222
- Add new configuration setting `synonym_analyzer`, to the `synonym` and `synonym_graph` filters, enabling the specification of a custom analyzer for reading the synonym file ([#16488](https://github.com/opensearch-project/OpenSearch/pull/16488)).
2323
- Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/))
24+
- Coordinator can return partial results after the timeout when allow_partial_search_results is true ([#16681](https://github.com/opensearch-project/OpenSearch/pull/16681)).
2425

2526
### Dependencies
2627
- Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504))

server/src/main/java/org/opensearch/action/search/MultiSearchRequest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,8 @@ public static void readMultiLineFormat(
281281
searchRequest.setCancelAfterTimeInterval(nodeTimeValue(value, null));
282282
} else if ("phase_took".equals(entry.getKey())) {
283283
searchRequest.setPhaseTook(nodeBooleanValue(value));
284+
} else if ("coordinator_timeout".equals(entry.getKey())) {
285+
searchRequest.setCoordinatorTimeout(nodeTimeValue(value));
284286
} else {
285287
throw new IllegalArgumentException("key [" + entry.getKey() + "] is not supported in the metadata section");
286288
}
@@ -385,6 +387,9 @@ public static void writeSearchRequestParams(SearchRequest request, XContentBuild
385387
if (request.isPhaseTook() != null) {
386388
xContentBuilder.field("phase_took", request.isPhaseTook());
387389
}
390+
if (request.getCoordinatorTimeout() != null) {
391+
xContentBuilder.field("coordinator_timeout", request.getCoordinatorTimeout().getStringRep());
392+
}
388393
xContentBuilder.endObject();
389394
}
390395

server/src/main/java/org/opensearch/action/search/SearchRequest.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import java.util.Objects;
6060

6161
import static org.opensearch.action.ValidateActions.addValidationError;
62+
import static org.opensearch.search.SearchService.NO_TIMEOUT;
6263

6364
/**
6465
* A request to execute search against one or more indices (or all). Best created using
@@ -123,6 +124,9 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
123124

124125
private Boolean phaseTook = null;
125126

127+
// it's only been used in coordinator, so we don't need to serialize/deserialize it
128+
private TimeValue coordinatorTimeout = null;
129+
126130
public SearchRequest() {
127131
this.localClusterAlias = null;
128132
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
@@ -228,6 +232,7 @@ private SearchRequest(
228232
this.finalReduce = finalReduce;
229233
this.cancelAfterTimeInterval = searchRequest.cancelAfterTimeInterval;
230234
this.phaseTook = searchRequest.phaseTook;
235+
this.coordinatorTimeout = searchRequest.coordinatorTimeout;
231236
}
232237

233238
/**
@@ -275,6 +280,7 @@ public SearchRequest(StreamInput in) throws IOException {
275280
if (in.getVersion().onOrAfter(Version.V_2_12_0)) {
276281
phaseTook = in.readOptionalBoolean();
277282
}
283+
coordinatorTimeout = null;
278284
}
279285

280286
@Override
@@ -341,6 +347,13 @@ public ActionRequestValidationException validate() {
341347
if (source.aggregations() != null) {
342348
validationException = source.aggregations().validate(validationException);
343349
}
350+
if (source.timeout() != null && coordinatorTimeout != null && source.timeout().compareTo(coordinatorTimeout) < 0) {
351+
validationException = addValidationError(
352+
"timeout [" + source.timeout() + "] cannot be smaller than coordinator timeout [" + coordinatorTimeout + "]",
353+
validationException
354+
);
355+
356+
}
344357
}
345358
if (pointInTimeBuilder() != null) {
346359
if (scroll) {
@@ -711,9 +724,18 @@ public String pipeline() {
711724
return pipeline;
712725
}
713726

727+
public void setCoordinatorTimeout(TimeValue coordinatorTimeout) {
728+
assert coordinatorTimeout != NO_TIMEOUT;
729+
this.coordinatorTimeout = coordinatorTimeout;
730+
}
731+
732+
public TimeValue getCoordinatorTimeout() {
733+
return coordinatorTimeout;
734+
}
735+
714736
@Override
715737
public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
716-
return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers, cancelAfterTimeInterval);
738+
return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers, cancelAfterTimeInterval, coordinatorTimeout);
717739
}
718740

719741
public final String buildDescription() {
@@ -765,7 +787,8 @@ public boolean equals(Object o) {
765787
&& ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips
766788
&& Objects.equals(cancelAfterTimeInterval, that.cancelAfterTimeInterval)
767789
&& Objects.equals(pipeline, that.pipeline)
768-
&& Objects.equals(phaseTook, that.phaseTook);
790+
&& Objects.equals(phaseTook, that.phaseTook)
791+
&& Objects.equals(coordinatorTimeout, that.coordinatorTimeout);
769792
}
770793

771794
@Override
@@ -787,7 +810,8 @@ public int hashCode() {
787810
absoluteStartMillis,
788811
ccsMinimizeRoundtrips,
789812
cancelAfterTimeInterval,
790-
phaseTook
813+
phaseTook,
814+
coordinatorTimeout
791815
);
792816
}
793817

@@ -832,6 +856,8 @@ public String toString() {
832856
+ pipeline
833857
+ ", phaseTook="
834858
+ phaseTook
859+
+ ", coordinatorTimeout="
860+
+ coordinatorTimeout
835861
+ "}";
836862
}
837863
}

server/src/main/java/org/opensearch/action/search/SearchTask.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public class SearchTask extends QueryGroupTask implements SearchBackpressureTask
5353
// generating description in a lazy way since source can be quite big
5454
private final Supplier<String> descriptionSupplier;
5555
private SearchProgressListener progressListener = SearchProgressListener.NOOP;
56+
private final TimeValue coordinatorTimeout;
5657

5758
public SearchTask(
5859
long id,
@@ -62,7 +63,7 @@ public SearchTask(
6263
TaskId parentTaskId,
6364
Map<String, String> headers
6465
) {
65-
this(id, type, action, descriptionSupplier, parentTaskId, headers, NO_TIMEOUT);
66+
this(id, type, action, descriptionSupplier, parentTaskId, headers, NO_TIMEOUT, null);
6667
}
6768

6869
public SearchTask(
@@ -72,10 +73,12 @@ public SearchTask(
7273
Supplier<String> descriptionSupplier,
7374
TaskId parentTaskId,
7475
Map<String, String> headers,
75-
TimeValue cancelAfterTimeInterval
76+
TimeValue cancelAfterTimeInterval,
77+
TimeValue coordinatorTimeout
7678
) {
7779
super(id, type, action, null, parentTaskId, headers, cancelAfterTimeInterval);
7880
this.descriptionSupplier = descriptionSupplier;
81+
this.coordinatorTimeout = coordinatorTimeout;
7982
}
8083

8184
@Override
@@ -106,4 +109,8 @@ public final SearchProgressListener getProgressListener() {
106109
public boolean shouldCancelChildrenOnCancellation() {
107110
return true;
108111
}
112+
113+
public TimeValue getCoordinatorTimeout() {
114+
return coordinatorTimeout;
115+
}
109116
}

server/src/main/java/org/opensearch/action/search/SearchTransportService.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.opensearch.action.support.IndicesOptions;
4040
import org.opensearch.cluster.node.DiscoveryNode;
4141
import org.opensearch.common.Nullable;
42+
import org.opensearch.common.unit.TimeValue;
4243
import org.opensearch.common.util.concurrent.ConcurrentCollections;
4344
import org.opensearch.core.action.ActionListener;
4445
import org.opensearch.core.common.io.stream.StreamInput;
@@ -172,7 +173,7 @@ public void createPitContext(
172173
CREATE_READER_CONTEXT_ACTION_NAME,
173174
request,
174175
task,
175-
TransportRequestOptions.EMPTY,
176+
getTransportRequestOptions(task.getCoordinatorTimeout()),
176177
new ActionListenerResponseHandler<>(actionListener, TransportCreatePitAction.CreateReaderContextResponse::new)
177178
);
178179
}
@@ -188,7 +189,7 @@ public void sendCanMatch(
188189
QUERY_CAN_MATCH_NAME,
189190
request,
190191
task,
191-
TransportRequestOptions.EMPTY,
192+
getTransportRequestOptions(task.getCoordinatorTimeout()),
192193
new ActionListenerResponseHandler<>(listener, SearchService.CanMatchResponse::new)
193194
);
194195
}
@@ -228,6 +229,7 @@ public void sendExecuteDfs(
228229
DFS_ACTION_NAME,
229230
request,
230231
task,
232+
getTransportRequestOptions(task.getCoordinatorTimeout()),
231233
new ConnectionCountingHandler<>(listener, DfsSearchResult::new, clientConnections, connection.getNode().getId())
232234
);
233235
}
@@ -249,6 +251,7 @@ public void sendExecuteQuery(
249251
QUERY_ACTION_NAME,
250252
request,
251253
task,
254+
getTransportRequestOptions(task.getCoordinatorTimeout()),
252255
new ConnectionCountingHandler<>(handler, reader, clientConnections, connection.getNode().getId())
253256
);
254257
}
@@ -264,6 +267,7 @@ public void sendExecuteQuery(
264267
QUERY_ID_ACTION_NAME,
265268
request,
266269
task,
270+
getTransportRequestOptions(task.getCoordinatorTimeout()),
267271
new ConnectionCountingHandler<>(listener, QuerySearchResult::new, clientConnections, connection.getNode().getId())
268272
);
269273
}
@@ -279,6 +283,7 @@ public void sendExecuteScrollQuery(
279283
QUERY_SCROLL_ACTION_NAME,
280284
request,
281285
task,
286+
getTransportRequestOptions(task.getCoordinatorTimeout()),
282287
new ConnectionCountingHandler<>(listener, ScrollQuerySearchResult::new, clientConnections, connection.getNode().getId())
283288
);
284289
}
@@ -328,6 +333,7 @@ private void sendExecuteFetch(
328333
action,
329334
request,
330335
task,
336+
getTransportRequestOptions(task.getCoordinatorTimeout()),
331337
new ConnectionCountingHandler<>(listener, FetchSearchResult::new, clientConnections, connection.getNode().getId())
332338
);
333339
}
@@ -342,10 +348,19 @@ void sendExecuteMultiSearch(final MultiSearchRequest request, SearchTask task, f
342348
MultiSearchAction.NAME,
343349
request,
344350
task,
351+
getTransportRequestOptions(task.getCoordinatorTimeout()),
345352
new ConnectionCountingHandler<>(listener, MultiSearchResponse::new, clientConnections, connection.getNode().getId())
346353
);
347354
}
348355

356+
static TransportRequestOptions getTransportRequestOptions(TimeValue coordinatorTimeout) {
357+
if (coordinatorTimeout != null) {
358+
return TransportRequestOptions.builder().withTimeout(coordinatorTimeout).build();
359+
} else {
360+
return TransportRequestOptions.EMPTY;
361+
}
362+
}
363+
349364
public RemoteClusterService getRemoteClusterService() {
350365
return transportService.getRemoteClusterService();
351366
}

server/src/main/java/org/opensearch/rest/action/search/RestMultiSearchAction.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ public static MultiSearchRequest parseRequest(
157157
multiRequest.add(searchRequest);
158158
});
159159
List<SearchRequest> requests = multiRequest.requests();
160+
final TimeValue coordinatorTimeout = restRequest.paramAsTime("coordinator_timeout", null);
160161
final TimeValue cancelAfterTimeInterval = restRequest.paramAsTime("cancel_after_time_interval", null);
161162
for (SearchRequest request : requests) {
162163
// preserve if it's set on the request
@@ -171,6 +172,9 @@ public static MultiSearchRequest parseRequest(
171172
if (request.getCancelAfterTimeInterval() == null) {
172173
request.setCancelAfterTimeInterval(cancelAfterTimeInterval);
173174
}
175+
if (request.getCoordinatorTimeout() == null) {
176+
request.setCoordinatorTimeout(coordinatorTimeout);
177+
}
174178
}
175179
return multiRequest;
176180
}

server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ public static void parseSearchRequest(
224224
}
225225

226226
searchRequest.setCancelAfterTimeInterval(request.paramAsTime("cancel_after_time_interval", null));
227+
searchRequest.setCoordinatorTimeout(request.paramAsTime("coordinator_timeout", null));
227228
}
228229

229230
/**

0 commit comments

Comments
 (0)