44
44
import org .opensearch .core .common .io .stream .StreamInput ;
45
45
import org .opensearch .core .common .io .stream .StreamOutput ;
46
46
import org .opensearch .core .common .io .stream .Writeable ;
47
+ import org .opensearch .core .tasks .TaskCancelledException ;
47
48
import org .opensearch .core .transport .TransportResponse ;
48
49
import org .opensearch .ratelimitting .admissioncontrol .enums .AdmissionControlActionType ;
49
50
import org .opensearch .search .SearchPhaseResult ;
76
77
import java .util .Map ;
77
78
import java .util .Objects ;
78
79
import java .util .function .BiFunction ;
80
+ import java .util .function .Consumer ;
79
81
80
82
/**
81
83
* An encapsulation of {@link org.opensearch.search.SearchService} operations exposed through
@@ -167,12 +169,18 @@ public void createPitContext(
167
169
SearchTask task ,
168
170
ActionListener <TransportCreatePitAction .CreateReaderContextResponse > actionListener
169
171
) {
172
+
173
+ TransportRequestOptions options = getTransportRequestOptions (task , actionListener ::onFailure , false );
174
+ if (options == null ) {
175
+ return ;
176
+ }
177
+
170
178
transportService .sendChildRequest (
171
179
connection ,
172
180
CREATE_READER_CONTEXT_ACTION_NAME ,
173
181
request ,
174
182
task ,
175
- TransportRequestOptions . EMPTY ,
183
+ options ,
176
184
new ActionListenerResponseHandler <>(actionListener , TransportCreatePitAction .CreateReaderContextResponse ::new )
177
185
);
178
186
}
@@ -183,12 +191,18 @@ public void sendCanMatch(
183
191
SearchTask task ,
184
192
final ActionListener <SearchService .CanMatchResponse > listener
185
193
) {
194
+
195
+ TransportRequestOptions options = getTransportRequestOptions (task , listener ::onFailure , false );
196
+ if (options == null ) {
197
+ return ;
198
+ }
199
+
186
200
transportService .sendChildRequest (
187
201
connection ,
188
202
QUERY_CAN_MATCH_NAME ,
189
203
request ,
190
204
task ,
191
- TransportRequestOptions . EMPTY ,
205
+ options ,
192
206
new ActionListenerResponseHandler <>(listener , SearchService .CanMatchResponse ::new )
193
207
);
194
208
}
@@ -223,11 +237,18 @@ public void sendExecuteDfs(
223
237
SearchTask task ,
224
238
final SearchActionListener <DfsSearchResult > listener
225
239
) {
240
+
241
+ TransportRequestOptions options = getTransportRequestOptions (task , listener ::onFailure , true );
242
+ if (options == null ) {
243
+ return ;
244
+ }
245
+
226
246
transportService .sendChildRequest (
227
247
connection ,
228
248
DFS_ACTION_NAME ,
229
249
request ,
230
250
task ,
251
+ options ,
231
252
new ConnectionCountingHandler <>(listener , DfsSearchResult ::new , clientConnections , connection .getNode ().getId ())
232
253
);
233
254
}
@@ -243,12 +264,18 @@ public void sendExecuteQuery(
243
264
final boolean fetchDocuments = request .numberOfShards () == 1 ;
244
265
Writeable .Reader <SearchPhaseResult > reader = fetchDocuments ? QueryFetchSearchResult ::new : QuerySearchResult ::new ;
245
266
267
+ TransportRequestOptions options = getTransportRequestOptions (task , listener ::onFailure , true );
268
+ if (options == null ) {
269
+ return ;
270
+ }
271
+
246
272
final ActionListener handler = responseWrapper .apply (connection , listener );
247
273
transportService .sendChildRequest (
248
274
connection ,
249
275
QUERY_ACTION_NAME ,
250
276
request ,
251
277
task ,
278
+ options ,
252
279
new ConnectionCountingHandler <>(handler , reader , clientConnections , connection .getNode ().getId ())
253
280
);
254
281
}
@@ -259,11 +286,18 @@ public void sendExecuteQuery(
259
286
SearchTask task ,
260
287
final SearchActionListener <QuerySearchResult > listener
261
288
) {
289
+
290
+ TransportRequestOptions options = getTransportRequestOptions (task , listener ::onFailure , true );
291
+ if (options == null ) {
292
+ return ;
293
+ }
294
+
262
295
transportService .sendChildRequest (
263
296
connection ,
264
297
QUERY_ID_ACTION_NAME ,
265
298
request ,
266
299
task ,
300
+ options ,
267
301
new ConnectionCountingHandler <>(listener , QuerySearchResult ::new , clientConnections , connection .getNode ().getId ())
268
302
);
269
303
}
@@ -274,11 +308,18 @@ public void sendExecuteScrollQuery(
274
308
SearchTask task ,
275
309
final SearchActionListener <ScrollQuerySearchResult > listener
276
310
) {
311
+
312
+ TransportRequestOptions options = getTransportRequestOptions (task , listener ::onFailure , false );
313
+ if (options == null ) {
314
+ return ;
315
+ }
316
+
277
317
transportService .sendChildRequest (
278
318
connection ,
279
319
QUERY_SCROLL_ACTION_NAME ,
280
320
request ,
281
321
task ,
322
+ options ,
282
323
new ConnectionCountingHandler <>(listener , ScrollQuerySearchResult ::new , clientConnections , connection .getNode ().getId ())
283
324
);
284
325
}
@@ -323,11 +364,17 @@ private void sendExecuteFetch(
323
364
SearchTask task ,
324
365
final SearchActionListener <FetchSearchResult > listener
325
366
) {
367
+ TransportRequestOptions options = getTransportRequestOptions (task , listener ::onFailure , false );
368
+ if (options == null ) {
369
+ return ;
370
+ }
371
+
326
372
transportService .sendChildRequest (
327
373
connection ,
328
374
action ,
329
375
request ,
330
376
task ,
377
+ options ,
331
378
new ConnectionCountingHandler <>(listener , FetchSearchResult ::new , clientConnections , connection .getNode ().getId ())
332
379
);
333
380
}
@@ -337,15 +384,42 @@ private void sendExecuteFetch(
337
384
*/
338
385
void sendExecuteMultiSearch (final MultiSearchRequest request , SearchTask task , final ActionListener <MultiSearchResponse > listener ) {
339
386
final Transport .Connection connection = transportService .getConnection (transportService .getLocalNode ());
387
+
388
+ TransportRequestOptions options = getTransportRequestOptions (task , listener ::onFailure , false );
389
+ if (options == null ) {
390
+ return ;
391
+ }
392
+
340
393
transportService .sendChildRequest (
341
394
connection ,
342
395
MultiSearchAction .NAME ,
343
396
request ,
344
397
task ,
398
+ options ,
345
399
new ConnectionCountingHandler <>(listener , MultiSearchResponse ::new , clientConnections , connection .getNode ().getId ())
346
400
);
347
401
}
348
402
403
+ public TransportRequestOptions getTransportRequestOptions (SearchTask task , Consumer <Exception > onFailure , boolean queryPhase ) {
404
+ if (task .timeoutMills () > 0 ) {
405
+ long leftTimeMills ;
406
+ if (queryPhase ) {
407
+ //it's costly in query phase.
408
+ leftTimeMills = task .queryPhaseTimeout () - (System .currentTimeMillis () - task .startTimeMills ());
409
+ } else {
410
+ leftTimeMills = task .timeoutMills () - (System .currentTimeMillis () - task .startTimeMills ());
411
+ }
412
+ if (leftTimeMills <= 0 ) {
413
+ onFailure .accept (new TaskCancelledException ("failed to execute fetch phase, timeout exceeded" ));
414
+ return null ;
415
+ } else {
416
+ return TransportRequestOptions .builder ().withTimeout (leftTimeMills ).build ();
417
+ }
418
+ } else {
419
+ return TransportRequestOptions .EMPTY ;
420
+ }
421
+ }
422
+
349
423
public RemoteClusterService getRemoteClusterService () {
350
424
return transportService .getRemoteClusterService ();
351
425
}
0 commit comments