Skip to content

Commit eb6471d

Browse files
[Bugfix] Fix incorrect document order when there's exception during batch ingest (#14341) (#14358)
(cherry picked from commit d480027) Signed-off-by: Liyun Xiu <xiliyun@amazon.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 9614561 commit eb6471d

File tree

3 files changed

+136
-15
lines changed

3 files changed

+136
-15
lines changed

server/src/internalClusterTest/java/org/opensearch/ingest/IngestClientIT.java

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,18 @@
6060
import org.opensearch.plugins.Plugin;
6161
import org.opensearch.test.OpenSearchIntegTestCase;
6262
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
63+
import org.hamcrest.MatcherAssert;
6364

6465
import java.util.Arrays;
6566
import java.util.Collection;
6667
import java.util.Collections;
6768
import java.util.HashMap;
6869
import java.util.Map;
70+
import java.util.stream.Collectors;
6971

7072
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
7173
import static org.opensearch.test.NodeRoles.nonIngestNode;
74+
import static org.hamcrest.Matchers.containsInAnyOrder;
7275
import static org.hamcrest.Matchers.equalTo;
7376
import static org.hamcrest.Matchers.instanceOf;
7477
import static org.hamcrest.Matchers.notNullValue;
@@ -159,6 +162,14 @@ public void testSimulate() throws Exception {
159162
}
160163

161164
public void testBulkWithIngestFailures() throws Exception {
165+
runBulkTestWithRandomDocs(false);
166+
}
167+
168+
public void testBulkWithIngestFailuresWithBatchSize() throws Exception {
169+
runBulkTestWithRandomDocs(true);
170+
}
171+
172+
private void runBulkTestWithRandomDocs(boolean shouldSetBatchSize) throws Exception {
162173
createIndex("index");
163174

164175
BytesReference source = BytesReference.bytes(
@@ -177,6 +188,9 @@ public void testBulkWithIngestFailures() throws Exception {
177188

178189
int numRequests = scaledRandomIntBetween(32, 128);
179190
BulkRequest bulkRequest = new BulkRequest();
191+
if (shouldSetBatchSize) {
192+
bulkRequest.batchSize(numRequests);
193+
}
180194
for (int i = 0; i < numRequests; i++) {
181195
IndexRequest indexRequest = new IndexRequest("index").id(Integer.toString(i)).setPipeline("_id");
182196
indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", i % 2 == 0);
@@ -209,6 +223,103 @@ public void testBulkWithIngestFailures() throws Exception {
209223
assertTrue(deletePipelineResponse.isAcknowledged());
210224
}
211225

226+
public void testBulkWithIngestFailuresBatch() throws Exception {
227+
createIndex("index");
228+
229+
BytesReference source = BytesReference.bytes(
230+
jsonBuilder().startObject()
231+
.field("description", "my_pipeline")
232+
.startArray("processors")
233+
.startObject()
234+
.startObject("test")
235+
.endObject()
236+
.endObject()
237+
.endArray()
238+
.endObject()
239+
);
240+
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, MediaTypeRegistry.JSON);
241+
client().admin().cluster().putPipeline(putPipelineRequest).get();
242+
243+
BulkRequest bulkRequest = new BulkRequest();
244+
bulkRequest.batchSize(2);
245+
bulkRequest.add(
246+
new IndexRequest("index").id("_fail").setPipeline("_id").source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", true)
247+
);
248+
bulkRequest.add(
249+
new IndexRequest("index").id("_success").setPipeline("_id").source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", false)
250+
);
251+
252+
BulkResponse response = client().bulk(bulkRequest).actionGet();
253+
MatcherAssert.assertThat(response.getItems().length, equalTo(bulkRequest.requests().size()));
254+
255+
Map<String, BulkItemResponse> results = Arrays.stream(response.getItems())
256+
.collect(Collectors.toMap(BulkItemResponse::getId, r -> r));
257+
258+
MatcherAssert.assertThat(results.keySet(), containsInAnyOrder("_fail", "_success"));
259+
assertNotNull(results.get("_fail").getFailure());
260+
assertNull(results.get("_success").getFailure());
261+
262+
// verify field of successful doc
263+
Map<String, Object> successDoc = client().prepareGet("index", "_success").get().getSourceAsMap();
264+
assertThat(successDoc.get("processed"), equalTo(true));
265+
266+
// cleanup
267+
AcknowledgedResponse deletePipelineResponse = client().admin().cluster().prepareDeletePipeline("_id").get();
268+
assertTrue(deletePipelineResponse.isAcknowledged());
269+
}
270+
271+
public void testBulkWithIngestFailuresAndDropBatch() throws Exception {
272+
createIndex("index");
273+
274+
BytesReference source = BytesReference.bytes(
275+
jsonBuilder().startObject()
276+
.field("description", "my_pipeline")
277+
.startArray("processors")
278+
.startObject()
279+
.startObject("test")
280+
.endObject()
281+
.endObject()
282+
.endArray()
283+
.endObject()
284+
);
285+
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, MediaTypeRegistry.JSON);
286+
client().admin().cluster().putPipeline(putPipelineRequest).get();
287+
288+
BulkRequest bulkRequest = new BulkRequest();
289+
bulkRequest.batchSize(3);
290+
bulkRequest.add(
291+
new IndexRequest("index").id("_fail").setPipeline("_id").source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", true)
292+
);
293+
bulkRequest.add(
294+
new IndexRequest("index").id("_success").setPipeline("_id").source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", false)
295+
);
296+
bulkRequest.add(
297+
new IndexRequest("index").id("_drop").setPipeline("_id").source(Requests.INDEX_CONTENT_TYPE, "field", "value", "drop", true)
298+
);
299+
300+
BulkResponse response = client().bulk(bulkRequest).actionGet();
301+
MatcherAssert.assertThat(response.getItems().length, equalTo(bulkRequest.requests().size()));
302+
303+
Map<String, BulkItemResponse> results = Arrays.stream(response.getItems())
304+
.collect(Collectors.toMap(BulkItemResponse::getId, r -> r));
305+
306+
MatcherAssert.assertThat(results.keySet(), containsInAnyOrder("_fail", "_success", "_drop"));
307+
assertNotNull(results.get("_fail").getFailure());
308+
assertNull(results.get("_success").getFailure());
309+
assertNull(results.get("_drop").getFailure());
310+
311+
// verify dropped doc not in index
312+
assertNull(client().prepareGet("index", "_drop").get().getSourceAsMap());
313+
314+
// verify field of successful doc
315+
Map<String, Object> successDoc = client().prepareGet("index", "_success").get().getSourceAsMap();
316+
assertThat(successDoc.get("processed"), equalTo(true));
317+
318+
// cleanup
319+
AcknowledgedResponse deletePipelineResponse = client().admin().cluster().prepareDeletePipeline("_id").get();
320+
assertTrue(deletePipelineResponse.isAcknowledged());
321+
}
322+
212323
public void testBulkWithUpsert() throws Exception {
213324
createIndex("index");
214325

server/src/main/java/org/opensearch/ingest/IngestService.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -775,7 +775,7 @@ private void executePipelinesInBatchRequests(
775775
),
776776
results.get(i).getException()
777777
);
778-
onFailure.accept(slots.get(i), results.get(i).getException());
778+
onFailure.accept(results.get(i).getSlot(), results.get(i).getException());
779779
}
780780
}
781781

@@ -1092,15 +1092,15 @@ private void innerBatchExecute(
10921092
}
10931093
if (!exceptions.isEmpty()) {
10941094
totalMetrics.failedN(exceptions.size());
1095-
} else if (!dropped.isEmpty()) {
1095+
}
1096+
if (!dropped.isEmpty()) {
10961097
dropped.forEach(t -> itemDroppedHandler.accept(t.getSlot()));
1097-
} else {
1098-
for (IngestDocumentWrapper ingestDocumentWrapper : succeeded) {
1099-
updateIndexRequestWithIngestDocument(
1100-
slotToindexRequestMap.get(ingestDocumentWrapper.getSlot()),
1101-
ingestDocumentWrapper.getIngestDocument()
1102-
);
1103-
}
1098+
}
1099+
for (IngestDocumentWrapper ingestDocumentWrapper : succeeded) {
1100+
updateIndexRequestWithIngestDocument(
1101+
slotToindexRequestMap.get(ingestDocumentWrapper.getSlot()),
1102+
ingestDocumentWrapper.getIngestDocument()
1103+
);
11041104
}
11051105
handler.accept(allResults);
11061106
}

server/src/test/java/org/opensearch/ingest/IngestServiceTests.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
import java.util.function.LongSupplier;
9898
import java.util.stream.Collectors;
9999

100+
import org.mockito.ArgumentCaptor;
100101
import org.mockito.ArgumentMatcher;
101102
import org.mockito.invocation.InvocationOnMock;
102103

@@ -1894,7 +1895,7 @@ public void testExecuteBulkRequestInBatchWithException() {
18941895
verify(mockCompoundProcessor, never()).execute(any(), any());
18951896
}
18961897

1897-
public void testExecuteBulkRequestInBatchWithExceptionInCallback() {
1898+
public void testExecuteBulkRequestInBatchWithExceptionAndDropInCallback() {
18981899
CompoundProcessor mockCompoundProcessor = mockCompoundProcessor();
18991900
IngestService ingestService = createWithProcessors(
19001901
Collections.singletonMap("mock", (factories, tag, description, config) -> mockCompoundProcessor)
@@ -1906,11 +1907,14 @@ public void testExecuteBulkRequestInBatchWithExceptionInCallback() {
19061907
bulkRequest.add(indexRequest1);
19071908
IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
19081909
bulkRequest.add(indexRequest2);
1909-
bulkRequest.batchSize(2);
1910+
IndexRequest indexRequest3 = new IndexRequest("_index").id("_id3").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
1911+
bulkRequest.add(indexRequest3);
1912+
bulkRequest.batchSize(3);
19101913

19111914
List<IngestDocumentWrapper> results = Arrays.asList(
19121915
new IngestDocumentWrapper(0, IngestService.toIngestDocument(indexRequest1), null),
1913-
new IngestDocumentWrapper(1, null, new RuntimeException())
1916+
new IngestDocumentWrapper(1, null, new RuntimeException()),
1917+
new IngestDocumentWrapper(2, null, null)
19141918
);
19151919
doAnswer(args -> {
19161920
@SuppressWarnings("unchecked")
@@ -1923,16 +1927,22 @@ public void testExecuteBulkRequestInBatchWithExceptionInCallback() {
19231927
final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
19241928
@SuppressWarnings("unchecked")
19251929
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
1930+
final IntConsumer dropHandler = mock(IntConsumer.class);
19261931
ingestService.executeBulkRequest(
1927-
2,
1932+
3,
19281933
bulkRequest.requests(),
19291934
failureHandler,
19301935
completionHandler,
1931-
indexReq -> {},
1936+
dropHandler,
19321937
Names.WRITE,
19331938
bulkRequest
19341939
);
1935-
verify(failureHandler, times(1)).accept(any(), any());
1940+
ArgumentCaptor<Integer> failureSlotCaptor = ArgumentCaptor.forClass(Integer.class);
1941+
verify(failureHandler, times(1)).accept(failureSlotCaptor.capture(), any());
1942+
assertEquals(1, failureSlotCaptor.getValue().intValue());
1943+
ArgumentCaptor<Integer> dropSlotCaptor = ArgumentCaptor.forClass(Integer.class);
1944+
verify(dropHandler, times(1)).accept(dropSlotCaptor.capture());
1945+
assertEquals(2, dropSlotCaptor.getValue().intValue());
19361946
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
19371947
verify(mockCompoundProcessor, times(1)).batchExecute(any(), any());
19381948
verify(mockCompoundProcessor, never()).execute(any(), any());

0 commit comments

Comments
 (0)