|
16 | 16 | import org.opensearch.action.search.SearchResponse;
|
17 | 17 | import org.opensearch.common.Nullable;
|
18 | 18 | import org.opensearch.common.io.stream.BytesStreamOutput;
|
| 19 | +import org.opensearch.core.action.ActionListener; |
19 | 20 | import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput;
|
20 | 21 | import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
|
21 | 22 | import org.opensearch.core.common.io.stream.StreamInput;
|
22 | 23 | import org.opensearch.search.SearchPhaseResult;
|
23 | 24 |
|
| 25 | +import java.io.IOException; |
24 | 26 | import java.util.Collections;
|
25 | 27 | import java.util.List;
|
26 | 28 | import java.util.concurrent.TimeUnit;
|
@@ -117,92 +119,138 @@ protected void afterResponseProcessor(Processor processor, long timeInNanos) {}
|
117 | 119 |
|
118 | 120 | protected void onResponseProcessorFailed(Processor processor) {}
|
119 | 121 |
|
120 |
| - SearchRequest transformRequest(SearchRequest request) throws SearchPipelineProcessingException { |
121 |
| - if (searchRequestProcessors.isEmpty() == false) { |
122 |
| - long pipelineStart = relativeTimeSupplier.getAsLong(); |
123 |
| - beforeTransformRequest(); |
124 |
| - try { |
125 |
| - try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { |
126 |
| - request.writeTo(bytesStreamOutput); |
127 |
| - try (StreamInput in = bytesStreamOutput.bytes().streamInput()) { |
128 |
| - try (StreamInput input = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry)) { |
129 |
| - request = new SearchRequest(input); |
130 |
| - } |
131 |
| - } |
132 |
| - } |
133 |
| - for (SearchRequestProcessor processor : searchRequestProcessors) { |
134 |
| - beforeRequestProcessor(processor); |
135 |
| - long start = relativeTimeSupplier.getAsLong(); |
136 |
| - try { |
137 |
| - request = processor.processRequest(request); |
138 |
| - } catch (Exception e) { |
139 |
| - onRequestProcessorFailed(processor); |
140 |
| - if (processor.isIgnoreFailure()) { |
141 |
| - logger.warn( |
142 |
| - "The exception from request processor [" |
143 |
| - + processor.getType() |
144 |
| - + "] in the search pipeline [" |
145 |
| - + id |
146 |
| - + "] was ignored", |
147 |
| - e |
148 |
| - ); |
149 |
| - } else { |
150 |
| - throw e; |
151 |
| - } |
152 |
| - } finally { |
153 |
| - long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); |
154 |
| - afterRequestProcessor(processor, took); |
155 |
| - } |
| 122 | + void transformRequest(SearchRequest request, ActionListener<SearchRequest> requestListener) throws SearchPipelineProcessingException { |
| 123 | + if (searchRequestProcessors.isEmpty()) { |
| 124 | + requestListener.onResponse(request); |
| 125 | + return; |
| 126 | + } |
| 127 | + |
| 128 | + try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { |
| 129 | + request.writeTo(bytesStreamOutput); |
| 130 | + try (StreamInput in = bytesStreamOutput.bytes().streamInput()) { |
| 131 | + try (StreamInput input = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry)) { |
| 132 | + request = new SearchRequest(input); |
156 | 133 | }
|
157 |
| - } catch (Exception e) { |
158 |
| - onTransformRequestFailure(); |
159 |
| - throw new SearchPipelineProcessingException(e); |
160 |
| - } finally { |
161 |
| - long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart); |
162 |
| - afterTransformRequest(took); |
163 | 134 | }
|
| 135 | + } catch (IOException e) { |
| 136 | + requestListener.onFailure(new SearchPipelineProcessingException(e)); |
| 137 | + return; |
164 | 138 | }
|
165 |
| - return request; |
| 139 | + |
| 140 | + ActionListener<SearchRequest> finalListener = getTerminalSearchRequestActionListener(requestListener); |
| 141 | + |
| 142 | + // Chain listeners back-to-front |
| 143 | + ActionListener<SearchRequest> currentListener = finalListener; |
| 144 | + for (int i = searchRequestProcessors.size() - 1; i >= 0; i--) { |
| 145 | + final ActionListener<SearchRequest> nextListener = currentListener; |
| 146 | + SearchRequestProcessor processor = searchRequestProcessors.get(i); |
| 147 | + currentListener = ActionListener.wrap(r -> { |
| 148 | + long start = relativeTimeSupplier.getAsLong(); |
| 149 | + beforeRequestProcessor(processor); |
| 150 | + processor.processRequestAsync(r, ActionListener.wrap(rr -> { |
| 151 | + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); |
| 152 | + afterRequestProcessor(processor, took); |
| 153 | + nextListener.onResponse(rr); |
| 154 | + }, e -> { |
| 155 | + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); |
| 156 | + afterRequestProcessor(processor, took); |
| 157 | + onRequestProcessorFailed(processor); |
| 158 | + if (processor.isIgnoreFailure()) { |
| 159 | + logger.warn( |
| 160 | + "The exception from request processor [" |
| 161 | + + processor.getType() |
| 162 | + + "] in the search pipeline [" |
| 163 | + + id |
| 164 | + + "] was ignored", |
| 165 | + e |
| 166 | + ); |
| 167 | + nextListener.onResponse(r); |
| 168 | + } else { |
| 169 | + nextListener.onFailure(new SearchPipelineProcessingException(e)); |
| 170 | + } |
| 171 | + })); |
| 172 | + }, finalListener::onFailure); |
| 173 | + } |
| 174 | + |
| 175 | + beforeTransformRequest(); |
| 176 | + currentListener.onResponse(request); |
166 | 177 | }
|
167 | 178 |
|
168 |
| - SearchResponse transformResponse(SearchRequest request, SearchResponse response) throws SearchPipelineProcessingException { |
169 |
| - if (searchResponseProcessors.isEmpty() == false) { |
170 |
| - long pipelineStart = relativeTimeSupplier.getAsLong(); |
171 |
| - beforeTransformResponse(); |
172 |
| - try { |
173 |
| - for (SearchResponseProcessor processor : searchResponseProcessors) { |
174 |
| - beforeResponseProcessor(processor); |
175 |
| - long start = relativeTimeSupplier.getAsLong(); |
176 |
| - try { |
177 |
| - response = processor.processResponse(request, response); |
178 |
| - } catch (Exception e) { |
179 |
| - onResponseProcessorFailed(processor); |
180 |
| - if (processor.isIgnoreFailure()) { |
181 |
| - logger.warn( |
182 |
| - "The exception from response processor [" |
183 |
| - + processor.getType() |
184 |
| - + "] in the search pipeline [" |
185 |
| - + id |
186 |
| - + "] was ignored", |
187 |
| - e |
188 |
| - ); |
189 |
| - } else { |
190 |
| - throw e; |
191 |
| - } |
192 |
| - } finally { |
193 |
| - long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); |
194 |
| - afterResponseProcessor(processor, took); |
| 179 | + private ActionListener<SearchRequest> getTerminalSearchRequestActionListener(ActionListener<SearchRequest> requestListener) { |
| 180 | + final long pipelineStart = relativeTimeSupplier.getAsLong(); |
| 181 | + |
| 182 | + return ActionListener.wrap(r -> { |
| 183 | + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart); |
| 184 | + afterTransformRequest(took); |
| 185 | + requestListener.onResponse(new PipelinedRequest(this, r)); |
| 186 | + }, e -> { |
| 187 | + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart); |
| 188 | + afterTransformRequest(took); |
| 189 | + onTransformRequestFailure(); |
| 190 | + requestListener.onFailure(new SearchPipelineProcessingException(e)); |
| 191 | + }); |
| 192 | + } |
| 193 | + |
| 194 | + ActionListener<SearchResponse> transformResponseListener(SearchRequest request, ActionListener<SearchResponse> responseListener) { |
| 195 | + if (searchResponseProcessors.isEmpty()) { |
| 196 | + // No response transformation necessary |
| 197 | + return responseListener; |
| 198 | + } |
| 199 | + |
| 200 | + long[] pipelineStart = new long[1]; |
| 201 | + |
| 202 | + final ActionListener<SearchResponse> originalListener = responseListener; |
| 203 | + responseListener = ActionListener.wrap(r -> { |
| 204 | + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart[0]); |
| 205 | + afterTransformResponse(took); |
| 206 | + originalListener.onResponse(r); |
| 207 | + }, e -> { |
| 208 | + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart[0]); |
| 209 | + afterTransformResponse(took); |
| 210 | + onTransformResponseFailure(); |
| 211 | + originalListener.onFailure(e); |
| 212 | + }); |
| 213 | + ActionListener<SearchResponse> finalListener = responseListener; // Jump directly to this one on exception. |
| 214 | + |
| 215 | + for (int i = searchResponseProcessors.size() - 1; i >= 0; i--) { |
| 216 | + final ActionListener<SearchResponse> currentFinalListener = responseListener; |
| 217 | + final SearchResponseProcessor processor = searchResponseProcessors.get(i); |
| 218 | + |
| 219 | + responseListener = ActionListener.wrap(r -> { |
| 220 | + beforeResponseProcessor(processor); |
| 221 | + final long start = relativeTimeSupplier.getAsLong(); |
| 222 | + processor.processResponseAsync(request, r, ActionListener.wrap(rr -> { |
| 223 | + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); |
| 224 | + afterResponseProcessor(processor, took); |
| 225 | + currentFinalListener.onResponse(rr); |
| 226 | + }, e -> { |
| 227 | + onResponseProcessorFailed(processor); |
| 228 | + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); |
| 229 | + afterResponseProcessor(processor, took); |
| 230 | + if (processor.isIgnoreFailure()) { |
| 231 | + logger.warn( |
| 232 | + "The exception from response processor [" |
| 233 | + + processor.getType() |
| 234 | + + "] in the search pipeline [" |
| 235 | + + id |
| 236 | + + "] was ignored", |
| 237 | + e |
| 238 | + ); |
| 239 | + // Pass the previous response through to the next processor in the chain |
| 240 | + currentFinalListener.onResponse(r); |
| 241 | + } else { |
| 242 | + currentFinalListener.onFailure(new SearchPipelineProcessingException(e)); |
195 | 243 | }
|
196 |
| - } |
197 |
| - } catch (Exception e) { |
198 |
| - onTransformResponseFailure(); |
199 |
| - throw new SearchPipelineProcessingException(e); |
200 |
| - } finally { |
201 |
| - long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart); |
202 |
| - afterTransformResponse(took); |
203 |
| - } |
| 244 | + })); |
| 245 | + }, finalListener::onFailure); |
204 | 246 | }
|
205 |
| - return response; |
| 247 | + final ActionListener<SearchResponse> chainListener = responseListener; |
| 248 | + return ActionListener.wrap(r -> { |
| 249 | + beforeTransformResponse(); |
| 250 | + pipelineStart[0] = relativeTimeSupplier.getAsLong(); |
| 251 | + chainListener.onResponse(r); |
| 252 | + }, originalListener::onFailure); |
| 253 | + |
206 | 254 | }
|
207 | 255 |
|
208 | 256 | <Result extends SearchPhaseResult> void runSearchPhaseResultsTransformer(
|
|
0 commit comments