@@ -189,7 +189,7 @@ private void runBulkTestWithRandomDocs(boolean shouldSetBatchSize) throws Except
189189 int numRequests = scaledRandomIntBetween (32 , 128 );
190190 BulkRequest bulkRequest = new BulkRequest ();
191191 if (shouldSetBatchSize ) {
192- bulkRequest .batchSize (numRequests );
192+ bulkRequest .batchSize (scaledRandomIntBetween ( 2 , numRequests ) );
193193 }
194194 for (int i = 0 ; i < numRequests ; i ++) {
195195 IndexRequest indexRequest = new IndexRequest ("index" ).id (Integer .toString (i )).setPipeline ("_id" );
@@ -214,6 +214,9 @@ private void runBulkTestWithRandomDocs(boolean shouldSetBatchSize) throws Except
214214 );
215215 assertThat (indexResponse , notNullValue ());
216216 assertThat (indexResponse .getId (), equalTo (Integer .toString (i )));
217+ // verify field of successful doc
218+ Map <String , Object > successDoc = client ().prepareGet ("index" , indexResponse .getId ()).get ().getSourceAsMap ();
219+ assertThat (successDoc .get ("processed" ), equalTo (true ));
217220 assertEquals (DocWriteResponse .Result .CREATED , indexResponse .getResult ());
218221 }
219222 }
@@ -223,51 +226,6 @@ private void runBulkTestWithRandomDocs(boolean shouldSetBatchSize) throws Except
223226 assertTrue (deletePipelineResponse .isAcknowledged ());
224227 }
225228
226- public void testBulkWithIngestFailuresBatch () throws Exception {
227- createIndex ("index" );
228-
229- BytesReference source = BytesReference .bytes (
230- jsonBuilder ().startObject ()
231- .field ("description" , "my_pipeline" )
232- .startArray ("processors" )
233- .startObject ()
234- .startObject ("test" )
235- .endObject ()
236- .endObject ()
237- .endArray ()
238- .endObject ()
239- );
240- PutPipelineRequest putPipelineRequest = new PutPipelineRequest ("_id" , source , MediaTypeRegistry .JSON );
241- client ().admin ().cluster ().putPipeline (putPipelineRequest ).get ();
242-
243- BulkRequest bulkRequest = new BulkRequest ();
244- bulkRequest .batchSize (2 );
245- bulkRequest .add (
246- new IndexRequest ("index" ).id ("_fail" ).setPipeline ("_id" ).source (Requests .INDEX_CONTENT_TYPE , "field" , "value" , "fail" , true )
247- );
248- bulkRequest .add (
249- new IndexRequest ("index" ).id ("_success" ).setPipeline ("_id" ).source (Requests .INDEX_CONTENT_TYPE , "field" , "value" , "fail" , false )
250- );
251-
252- BulkResponse response = client ().bulk (bulkRequest ).actionGet ();
253- MatcherAssert .assertThat (response .getItems ().length , equalTo (bulkRequest .requests ().size ()));
254-
255- Map <String , BulkItemResponse > results = Arrays .stream (response .getItems ())
256- .collect (Collectors .toMap (BulkItemResponse ::getId , r -> r ));
257-
258- MatcherAssert .assertThat (results .keySet (), containsInAnyOrder ("_fail" , "_success" ));
259- assertNotNull (results .get ("_fail" ).getFailure ());
260- assertNull (results .get ("_success" ).getFailure ());
261-
262- // verify field of successful doc
263- Map <String , Object > successDoc = client ().prepareGet ("index" , "_success" ).get ().getSourceAsMap ();
264- assertThat (successDoc .get ("processed" ), equalTo (true ));
265-
266- // cleanup
267- AcknowledgedResponse deletePipelineResponse = client ().admin ().cluster ().prepareDeletePipeline ("_id" ).get ();
268- assertTrue (deletePipelineResponse .isAcknowledged ());
269- }
270-
271229 public void testBulkWithIngestFailuresAndDropBatch () throws Exception {
272230 createIndex ("index" );
273231
0 commit comments