Skip to content

Commit 9a3fc30

Browse files
authored
Fix ingest pipeline cannot be executed when retry the failed index requests for update_by_query and reindex (#18003)
* Fix ingest pipeline cannot be executed when retry the failed index requests for update_by_query API and reindex API Signed-off-by: Binlong Gao <gbinlong@amazon.com> * Modify changelog Signed-off-by: Binlong Gao <gbinlong@amazon.com> * Optimize some code Signed-off-by: Binlong Gao <gbinlong@amazon.com> * Fix precommit failure Signed-off-by: Binlong Gao <gbinlong@amazon.com> --------- Signed-off-by: Binlong Gao <gbinlong@amazon.com>
1 parent 8932876 commit 9a3fc30

File tree

10 files changed

+256
-13
lines changed

10 files changed

+256
-13
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2525
### Removed
2626

2727
### Fixed
28+
- Fix ingest pipeline cannot be executed when retry the failed index requests for update_by_query API and reindex API ([#18003](https://github.com/opensearch-project/OpenSearch/pull/18003))
2829
- With creation of FilterFieldType, we need unwrap all the MappedFieldType before using the instanceof check. ([#17951](https://github.com/opensearch-project/OpenSearch/pull/17951))
2930
- Fix simultaneously creating a snapshot and updating the repository can potentially trigger an infinite loop ([#17532](https://github.com/opensearch-project/OpenSearch/pull/17532))
3031
- Remove package org.opensearch.transport.grpc and replace with org.opensearch.plugin.transport.grpc ([#18031](https://github.com/opensearch-project/OpenSearch/pull/18031))

modules/reindex/src/main/java/org/opensearch/index/reindex/AbstractAsyncBulkByScrollAction.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,14 @@ public BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>>
221221
*/
222222
protected abstract RequestWrapper<?> buildRequest(ScrollableHitSource.Hit doc);
223223

224+
/**
225+
* Build the {@link BulkRequest} for a new bulk operation.
226+
* @return the new BulkRequest
227+
*/
228+
protected BulkRequest buildBulkRequest() {
229+
return new BulkRequest();
230+
}
231+
224232
/**
225233
* Copies the metadata from a hit to the request.
226234
*/
@@ -254,7 +262,7 @@ protected boolean accept(ScrollableHitSource.Hit doc) {
254262
}
255263

256264
private BulkRequest buildBulk(Iterable<? extends ScrollableHitSource.Hit> docs) {
257-
BulkRequest bulkRequest = new BulkRequest();
265+
BulkRequest bulkRequest = buildBulkRequest();
258266
for (ScrollableHitSource.Hit doc : docs) {
259267
if (accept(doc)) {
260268
RequestWrapper<?> request = scriptApplier.apply(copyMetadata(buildRequest(doc), doc), doc);

modules/reindex/src/main/java/org/opensearch/index/reindex/Reindexer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.opensearch.action.DocWriteRequest;
4949
import org.opensearch.action.bulk.BackoffPolicy;
5050
import org.opensearch.action.bulk.BulkItemResponse;
51+
import org.opensearch.action.bulk.BulkRequest;
5152
import org.opensearch.action.index.IndexRequest;
5253
import org.opensearch.client.RestClient;
5354
import org.opensearch.client.RestClientBuilder;
@@ -356,6 +357,11 @@ public BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>>
356357
return super.buildScriptApplier();
357358
}
358359

360+
@Override
361+
protected BulkRequest buildBulkRequest() {
362+
return new BulkRequest().pipeline(mainRequest.getDestination().getPipeline());
363+
}
364+
359365
@Override
360366
protected RequestWrapper<IndexRequest> buildRequest(ScrollableHitSource.Hit doc) {
361367
IndexRequest index = new IndexRequest();

modules/reindex/src/main/java/org/opensearch/index/reindex/TransportUpdateByQueryAction.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,11 @@ public BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>>
133133
return super.buildScriptApplier();
134134
}
135135

136+
@Override
137+
protected org.opensearch.action.bulk.BulkRequest buildBulkRequest() {
138+
return new org.opensearch.action.bulk.BulkRequest().pipeline(mainRequest.getPipeline());
139+
}
140+
136141
@Override
137142
protected RequestWrapper<IndexRequest> buildRequest(ScrollableHitSource.Hit doc) {
138143
IndexRequest index = new IndexRequest();

server/src/main/java/org/opensearch/action/bulk/BulkItemResponse.java

Lines changed: 86 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,39 @@ public static class Failure implements Writeable, ToXContentFragment {
197197
private final long seqNo;
198198
private final long term;
199199
private final boolean aborted;
200+
private final FailureSource source;
201+
202+
/**
203+
* The source of the failure, denotes which step has failure during the bulk processing.
204+
*/
205+
@PublicApi(since = "3.0.0")
206+
public enum FailureSource {
207+
UNKNOWN((byte) 0),
208+
// Pipeline execution failure
209+
PIPELINE((byte) 1),
210+
VALIDATION((byte) 2),
211+
WRITE_PROCESSING((byte) 3);
212+
213+
private final byte sourceType;
214+
215+
FailureSource(byte sourceType) {
216+
this.sourceType = sourceType;
217+
}
218+
219+
public byte getSourceType() {
220+
return sourceType;
221+
}
222+
223+
public static FailureSource fromSourceType(byte sourceType) {
224+
return switch (sourceType) {
225+
case 0 -> UNKNOWN;
226+
case 1 -> PIPELINE;
227+
case 2 -> VALIDATION;
228+
case 3 -> WRITE_PROCESSING;
229+
default -> throw new IllegalArgumentException("Unknown failure source: [" + sourceType + "]");
230+
};
231+
}
232+
}
200233

201234
public static final ConstructingObjectParser<Failure, Void> PARSER = new ConstructingObjectParser<>(
202235
"bulk_failures",
@@ -224,7 +257,21 @@ public Failure(String index, String id, Exception cause) {
224257
ExceptionsHelper.status(cause),
225258
SequenceNumbers.UNASSIGNED_SEQ_NO,
226259
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
227-
false
260+
false,
261+
FailureSource.UNKNOWN
262+
);
263+
}
264+
265+
public Failure(String index, String id, Exception cause, FailureSource source) {
266+
this(
267+
index,
268+
id,
269+
cause,
270+
ExceptionsHelper.status(cause),
271+
SequenceNumbers.UNASSIGNED_SEQ_NO,
272+
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
273+
false,
274+
source
228275
);
229276
}
230277

@@ -236,27 +283,47 @@ public Failure(String index, String id, Exception cause, boolean aborted) {
236283
ExceptionsHelper.status(cause),
237284
SequenceNumbers.UNASSIGNED_SEQ_NO,
238285
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
239-
aborted
286+
aborted,
287+
FailureSource.UNKNOWN
240288
);
241289
}
242290

243291
public Failure(String index, String id, Exception cause, RestStatus status) {
244-
this(index, id, cause, status, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, false);
292+
this(
293+
index,
294+
id,
295+
cause,
296+
status,
297+
SequenceNumbers.UNASSIGNED_SEQ_NO,
298+
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
299+
false,
300+
FailureSource.UNKNOWN
301+
);
245302
}
246303

247304
/** For write failures after operation was assigned a sequence number. */
248305
public Failure(String index, String id, Exception cause, long seqNo, long term) {
249-
this(index, id, cause, ExceptionsHelper.status(cause), seqNo, term, false);
250-
}
251-
252-
private Failure(String index, String id, Exception cause, RestStatus status, long seqNo, long term, boolean aborted) {
306+
this(index, id, cause, ExceptionsHelper.status(cause), seqNo, term, false, FailureSource.UNKNOWN);
307+
}
308+
309+
private Failure(
310+
String index,
311+
String id,
312+
Exception cause,
313+
RestStatus status,
314+
long seqNo,
315+
long term,
316+
boolean aborted,
317+
FailureSource source
318+
) {
253319
this.index = index;
254320
this.id = id;
255321
this.cause = cause;
256322
this.status = status;
257323
this.seqNo = seqNo;
258324
this.term = term;
259325
this.aborted = aborted;
326+
this.source = source;
260327
}
261328

262329
/**
@@ -275,6 +342,11 @@ public Failure(StreamInput in) throws IOException {
275342
seqNo = in.readZLong();
276343
term = in.readVLong();
277344
aborted = in.readBoolean();
345+
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
346+
source = FailureSource.fromSourceType(in.readByte());
347+
} else {
348+
source = FailureSource.UNKNOWN;
349+
}
278350
}
279351

280352
@Override
@@ -288,6 +360,9 @@ public void writeTo(StreamOutput out) throws IOException {
288360
out.writeZLong(seqNo);
289361
out.writeVLong(term);
290362
out.writeBoolean(aborted);
363+
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
364+
out.writeByte(source.getSourceType());
365+
}
291366
}
292367

293368
/**
@@ -352,6 +427,10 @@ public boolean isAborted() {
352427
return aborted;
353428
}
354429

430+
public FailureSource getSource() {
431+
return source;
432+
}
433+
355434
@Override
356435
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
357436
builder.field(INDEX_FIELD, index);

server/src/main/java/org/opensearch/action/bulk/Retry.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333

3434
import org.apache.logging.log4j.LogManager;
3535
import org.apache.logging.log4j.Logger;
36+
import org.opensearch.action.DocWriteRequest;
37+
import org.opensearch.action.index.IndexRequest;
3638
import org.opensearch.action.support.PlainActionFuture;
3739
import org.opensearch.common.unit.TimeValue;
3840
import org.opensearch.core.action.ActionListener;
@@ -169,11 +171,25 @@ private void retry(BulkRequest bulkRequestForRetry) {
169171
}
170172

171173
private BulkRequest createBulkRequestForRetry(BulkResponse bulkItemResponses) {
172-
BulkRequest requestToReissue = new BulkRequest();
174+
// global pipeline should be set in the new Bulk Request
175+
String globalPipeline = this.currentBulkRequest.pipeline();
176+
BulkRequest requestToReissue = new BulkRequest().pipeline(globalPipeline);
173177
int index = 0;
174178
for (BulkItemResponse bulkItemResponse : bulkItemResponses.getItems()) {
175179
if (bulkItemResponse.isFailed()) {
176-
requestToReissue.add(currentBulkRequest.requests().get(index));
180+
DocWriteRequest<?> docWriteRequest = currentBulkRequest.requests().get(index);
181+
// when executing pipeline failed with retryable exception, the pipeline needs to be executed again
182+
if (bulkItemResponse.getFailure().getSource() == BulkItemResponse.Failure.FailureSource.PIPELINE
183+
&& docWriteRequest instanceof IndexRequest indexRequest) {
184+
// Reset pipeline configuration for retry, after the first execution, the pipeline was set to _none, so we need to
185+
// reset it
186+
// to the global pipeline if the global pipeline exists,
187+
// if not, set to null to ensure the default pipeline can be resolved and set
188+
// see org.opensearch.ingest.IngestService.resolvePipelines()
189+
indexRequest.setPipeline(globalPipeline);
190+
indexRequest.isPipelineResolved(false);
191+
}
192+
requestToReissue.add(docWriteRequest);
177193
}
178194
index++;
179195
}

server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1077,7 +1077,12 @@ synchronized void markItemAsFailed(int slot, Exception e) {
10771077
// 2) Add a bulk item failure for this request
10781078
// 3) Continue with the next request in the bulk.
10791079
failedSlots.set(slot);
1080-
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.id(), e);
1080+
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(
1081+
indexRequest.index(),
1082+
indexRequest.id(),
1083+
e,
1084+
BulkItemResponse.Failure.FailureSource.PIPELINE
1085+
);
10811086
itemResponses.add(new BulkItemResponse(slot, indexRequest.opType(), failure));
10821087
}
10831088

server/src/test/java/org/opensearch/action/bulk/BulkItemResponseTests.java

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import org.opensearch.ExceptionsHelper;
3636
import org.opensearch.OpenSearchException;
37+
import org.opensearch.Version;
3738
import org.opensearch.action.DocWriteRequest;
3839
import org.opensearch.action.DocWriteResponse;
3940
import org.opensearch.action.bulk.BulkItemResponse.Failure;
@@ -42,11 +43,15 @@
4243
import org.opensearch.action.update.UpdateResponse;
4344
import org.opensearch.action.update.UpdateResponseTests;
4445
import org.opensearch.common.collect.Tuple;
46+
import org.opensearch.common.io.stream.BytesStreamOutput;
4547
import org.opensearch.common.xcontent.XContentType;
4648
import org.opensearch.core.common.bytes.BytesReference;
49+
import org.opensearch.core.common.io.stream.StreamInput;
50+
import org.opensearch.core.rest.RestStatus;
4751
import org.opensearch.core.xcontent.ToXContent;
4852
import org.opensearch.core.xcontent.XContentParser;
4953
import org.opensearch.test.OpenSearchTestCase;
54+
import org.opensearch.test.VersionUtils;
5055

5156
import java.io.IOException;
5257

@@ -129,6 +134,90 @@ public void testFailureToAndFromXContent() throws IOException {
129134
assertBulkItemResponse(expectedBulkItemResponse, parsedBulkItemResponse);
130135
}
131136

137+
public void testSerializationForFailure() throws Exception {
138+
final Failure failure = new Failure("index", "id", new OpenSearchException("test"));
139+
try (BytesStreamOutput out = new BytesStreamOutput()) {
140+
failure.writeTo(out);
141+
142+
final Failure deserializedFailure;
143+
try (StreamInput in = out.bytes().streamInput()) {
144+
deserializedFailure = new Failure(in);
145+
}
146+
assertEquals(failure.getIndex(), deserializedFailure.getIndex());
147+
assertEquals(failure.getId(), deserializedFailure.getId());
148+
assertEquals(failure.getMessage(), deserializedFailure.getMessage());
149+
assertEquals(failure.getStatus(), deserializedFailure.getStatus());
150+
assertEquals(failure.getSource(), deserializedFailure.getSource());
151+
assertDeepEquals((OpenSearchException) failure.getCause(), (OpenSearchException) deserializedFailure.getCause());
152+
}
153+
}
154+
155+
public void testBwcSerialization() throws Exception {
156+
{
157+
final Failure failure = new Failure("index", "id", new OpenSearchException("test"));
158+
final Version version = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT);
159+
try (BytesStreamOutput out = new BytesStreamOutput()) {
160+
out.setVersion(version);
161+
failure.writeTo(out);
162+
163+
try (StreamInput in = out.bytes().streamInput()) {
164+
in.setVersion(version);
165+
String index = in.readString();
166+
String id = in.readOptionalString();
167+
Exception cause = in.readException();
168+
RestStatus status = ExceptionsHelper.status(cause);
169+
long seqNo = in.readZLong();
170+
long term = in.readVLong();
171+
boolean aborted = in.readBoolean();
172+
Failure.FailureSource failureSource = Failure.FailureSource.UNKNOWN;
173+
if (version.onOrAfter(Version.V_3_0_0)) {
174+
failureSource = Failure.FailureSource.fromSourceType(in.readByte());
175+
}
176+
assertEquals(failure.getIndex(), index);
177+
assertEquals(failure.getId(), id);
178+
assertEquals(failure.getStatus(), status);
179+
assertEquals(failure.getSource(), failureSource);
180+
assertEquals(failure.getSeqNo(), seqNo);
181+
assertEquals(failure.getTerm(), term);
182+
assertEquals(failure.isAborted(), aborted);
183+
assertDeepEquals((OpenSearchException) failure.getCause(), (OpenSearchException) cause);
184+
}
185+
}
186+
}
187+
188+
{
189+
final Failure failure = new Failure("index", "id", new OpenSearchException("test"));
190+
final Version version = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT);
191+
try (BytesStreamOutput out = new BytesStreamOutput()) {
192+
out.setVersion(version);
193+
out.writeString(failure.getIndex());
194+
out.writeOptionalString(failure.getId());
195+
out.writeException(failure.getCause());
196+
out.writeZLong(failure.getSeqNo());
197+
out.writeVLong(failure.getTerm());
198+
out.writeBoolean(failure.isAborted());
199+
if (version.onOrAfter(Version.V_3_0_0)) {
200+
out.writeByte(failure.getSource().getSourceType());
201+
}
202+
203+
final Failure deserializedFailure;
204+
try (StreamInput in = out.bytes().streamInput()) {
205+
in.setVersion(version);
206+
deserializedFailure = new Failure(in);
207+
}
208+
209+
assertEquals(failure.getIndex(), deserializedFailure.getIndex());
210+
assertEquals(failure.getId(), deserializedFailure.getId());
211+
assertEquals(failure.getStatus(), deserializedFailure.getStatus());
212+
assertEquals(failure.getSource(), deserializedFailure.getSource());
213+
assertEquals(failure.getSeqNo(), deserializedFailure.getSeqNo());
214+
assertEquals(failure.getTerm(), deserializedFailure.getTerm());
215+
assertEquals(failure.isAborted(), deserializedFailure.isAborted());
216+
assertDeepEquals((OpenSearchException) failure.getCause(), (OpenSearchException) deserializedFailure.getCause());
217+
}
218+
}
219+
}
220+
132221
public static void assertBulkItemResponse(BulkItemResponse expected, BulkItemResponse actual) {
133222
assertEquals(expected.getItemId(), actual.getItemId());
134223
assertEquals(expected.getIndex(), actual.getIndex());
@@ -145,7 +234,7 @@ public static void assertBulkItemResponse(BulkItemResponse expected, BulkItemRes
145234
assertEquals(expectedFailure.getId(), actualFailure.getId());
146235
assertEquals(expectedFailure.getMessage(), actualFailure.getMessage());
147236
assertEquals(expectedFailure.getStatus(), actualFailure.getStatus());
148-
237+
assertEquals(expectedFailure.getSource(), actualFailure.getSource());
149238
assertDeepEquals((OpenSearchException) expectedFailure.getCause(), (OpenSearchException) actualFailure.getCause());
150239
} else {
151240
DocWriteResponse expectedDocResponse = expected.getResponse();

server/src/test/java/org/opensearch/action/bulk/BulkRequestModifierTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public void testBulkRequestModifier() {
8989
assertThat(item.getFailure().getIndex(), equalTo("_index"));
9090
assertThat(item.getFailure().getId(), equalTo(String.valueOf(j)));
9191
assertThat(item.getFailure().getMessage(), equalTo("java.lang.RuntimeException"));
92+
assertThat(item.getFailure().getSource(), equalTo(BulkItemResponse.Failure.FailureSource.PIPELINE));
9293
} else {
9394
assertThat(bulkResponse.getItems()[j], nullValue());
9495
}

0 commit comments

Comments
 (0)