From cfa50afb77b162f0473fb4c4a7fb3048dcf3a8d3 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Fri, 30 Dec 2022 14:32:14 -0500 Subject: [PATCH 1/8] [FLINK-30488] OpenSearch implementation of Async Sink Signed-off-by: Andriy Redko --- .../tests/OpensearchAsyncSinkExample.java | 81 +++++ .../4382f1f0-807a-45ff-97d8-42f72b6e9484 | 12 + flink-connector-opensearch/pom.xml | 8 + .../opensearch/sink/DocSerdeRequest.java | 107 ++++++ .../opensearch/sink/OpensearchAsyncSink.java | 150 ++++++++ .../sink/OpensearchAsyncSinkBuilder.java | 220 ++++++++++++ .../sink/OpensearchAsyncWriter.java | 274 ++++++++++++++ .../sink/OpensearchSinkBuilder.java | 2 +- .../sink/OpensearchWriterStateSerializer.java | 50 +++ .../sink/OpensearchAsyncSinkBuilderTest.java | 90 +++++ .../sink/OpensearchAsyncSinkITCase.java | 180 ++++++++++ .../sink/OpensearchAsyncWriterITCase.java | 334 ++++++++++++++++++ .../opensearch/sink/OpensearchSinkITCase.java | 43 +-- .../opensearch/sink/OpensearchTestClient.java | 4 + .../opensearch/sink/TestConverter.java | 76 ++++ 15 files changed, 1609 insertions(+), 22 deletions(-) create mode 100644 flink-connector-opensearch-e2e-tests/src/main/java/org/apache/flink/streaming/tests/OpensearchAsyncSinkExample.java create mode 100644 flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DocSerdeRequest.java create mode 100644 flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSink.java create mode 100644 flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilder.java create mode 100644 flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java create mode 100644 flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterStateSerializer.java create mode 100644 flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilderTest.java create mode 100644 flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkITCase.java create mode 100644 flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriterITCase.java create mode 100644 flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/TestConverter.java diff --git a/flink-connector-opensearch-e2e-tests/src/main/java/org/apache/flink/streaming/tests/OpensearchAsyncSinkExample.java b/flink-connector-opensearch-e2e-tests/src/main/java/org/apache/flink/streaming/tests/OpensearchAsyncSinkExample.java new file mode 100644 index 0000000..065c3a8 --- /dev/null +++ b/flink-connector-opensearch-e2e-tests/src/main/java/org/apache/flink/streaming/tests/OpensearchAsyncSinkExample.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.connector.opensearch.sink.OpensearchAsyncSink; +import org.apache.flink.connector.opensearch.sink.OpensearchAsyncSinkBuilder; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Collector; + +import org.apache.http.HttpHost; +import org.opensearch.action.index.IndexRequest; + +import java.util.ArrayList; +import java.util.List; + +/** End to end test for OpensearchAsyncSink. */ +public class OpensearchAsyncSinkExample { + + public static void main(String[] args) throws Exception { + + final ParameterTool parameterTool = ParameterTool.fromArgs(args); + + if (parameterTool.getNumberOfParameters() < 2) { + System.out.println( + "Missing parameters!\n" + "Usage: --numRecords --index "); + return; + } + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(5000); + + DataStream> source = + env.fromSequence(0, parameterTool.getInt("numRecords") - 1) + .flatMap( + new FlatMapFunction>() { + @Override + public void flatMap( + Long value, Collector> out) { + final String key = String.valueOf(value); + final String message = "message #" + value; + out.collect(Tuple2.of(key, message + "update #1")); + out.collect(Tuple2.of(key, message + "update #2")); + } + }); + + List httpHosts = new ArrayList<>(); + httpHosts.add(new HttpHost("127.0.0.1", 9200, "http")); + + OpensearchAsyncSinkBuilder> osSinkBuilder = + OpensearchAsyncSink.>builder() + .setHosts(new HttpHost("localhost:9200")) + .setElementConverter( + (element, context) -> + new IndexRequest("my-index") + .id(element.f0.toString()) + .source(element.f1)); + + source.sinkTo(osSinkBuilder.build()); + + env.execute("Opensearch end to end async sink test example"); + } +} diff --git a/flink-connector-opensearch/archunit-violations/4382f1f0-807a-45ff-97d8-42f72b6e9484 b/flink-connector-opensearch/archunit-violations/4382f1f0-807a-45ff-97d8-42f72b6e9484 index 02ea2fd..1ef9c43 100644 --- a/flink-connector-opensearch/archunit-violations/4382f1f0-807a-45ff-97d8-42f72b6e9484 +++ b/flink-connector-opensearch/archunit-violations/4382f1f0-807a-45ff-97d8-42f72b6e9484 @@ -1,3 +1,15 @@ +org.apache.flink.connector.opensearch.sink.OpensearchAsyncSinkITCase does not satisfy: only one of the following predicates match:\ +* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\ +* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\ +* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\ +* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\ + or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule +org.apache.flink.connector.opensearch.sink.OpensearchAsyncWriterITCase does not satisfy: only one of the following predicates match:\ +* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\ +* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\ +* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\ +* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\ + or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule org.apache.flink.connector.opensearch.sink.OpensearchSinkITCase does not satisfy: only one of the following predicates match:\ * reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\ * reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\ diff --git a/flink-connector-opensearch/pom.xml b/flink-connector-opensearch/pom.xml index fdc52b2..0cb47be 100644 --- a/flink-connector-opensearch/pom.xml +++ b/flink-connector-opensearch/pom.xml @@ -118,6 +118,14 @@ under the License. test-jar + + org.apache.flink + flink-connector-base + ${flink.version} + test + test-jar + + org.apache.flink diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DocSerdeRequest.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DocSerdeRequest.java new file mode 100644 index 0000000..2edd97f --- /dev/null +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DocSerdeRequest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.opensearch.sink; + +import org.apache.flink.annotation.PublicEvolving; + +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.update.UpdateRequest; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.io.stream.InputStreamStreamInput; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; + +/** + * Wrapper class around {@link DocWriteRequest} since it does not implement {@link Serializable}, + * required by AsyncSink scaffolding. + * + * @param type of the write request + */ +@PublicEvolving +public class DocSerdeRequest implements Serializable { + private static final long serialVersionUID = 1L; + private final DocWriteRequest request; + + private DocSerdeRequest(DocWriteRequest request) { + this.request = request; + } + + public DocWriteRequest getRequest() { + return request; + } + + static DocSerdeRequest from(DocWriteRequest request) { + return new DocSerdeRequest<>(request); + } + + static DocSerdeRequest readFrom(long requestSize, DataInputStream in) throws IOException { + try (final StreamInput stream = new InputStreamStreamInput(in, requestSize)) { + return new DocSerdeRequest<>(readDocumentRequest(stream)); + } + } + + void writeTo(DataOutputStream out) throws IOException { + try (BytesStreamOutput stream = new BytesStreamOutput()) { + writeDocumentRequest(stream, request); + out.write(BytesReference.toBytes(stream.bytes())); + } + } + + /** Read a document write (index/delete/update) request. */ + private static DocWriteRequest readDocumentRequest(StreamInput in) throws IOException { + byte type = in.readByte(); + DocWriteRequest docWriteRequest; + if (type == 0) { + docWriteRequest = new IndexRequest(in); + } else if (type == 1) { + docWriteRequest = new DeleteRequest(in); + } else if (type == 2) { + docWriteRequest = new UpdateRequest(in); + } else { + throw new IllegalStateException("Invalid request type [" + type + " ]"); + } + return docWriteRequest; + } + + /** Write a document write (index/delete/update) request. */ + private static void writeDocumentRequest(StreamOutput out, DocWriteRequest request) + throws IOException { + if (request instanceof IndexRequest) { + out.writeByte((byte) 0); + ((IndexRequest) request).writeTo(out); + } else if (request instanceof DeleteRequest) { + out.writeByte((byte) 1); + ((DeleteRequest) request).writeTo(out); + } else if (request instanceof UpdateRequest) { + out.writeByte((byte) 2); + ((UpdateRequest) request).writeTo(out); + } else { + throw new IllegalStateException( + "Invalid request [" + request.getClass().getSimpleName() + " ]"); + } + } +} diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSink.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSink.java new file mode 100644 index 0000000..f62e55b --- /dev/null +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSink.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.opensearch.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import org.apache.http.HttpHost; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Apache Flink's Async Sink to insert or update data in an Opensearch index (see please {@link + * OpensearchAsyncWriter}). + * + * @param type of the records converted to Opensearch actions (instances of {@link + * DocSerdeRequest}) + * @see OpensearchAsyncSinkBuilder on how to construct a OpensearchAsyncSink + */ +@PublicEvolving +public class OpensearchAsyncSink extends AsyncSinkBase> { + private static final long serialVersionUID = 1L; + + private final List hosts; + private final NetworkClientConfig networkClientConfig; + + /** + * Constructor creating an Opensearch async sink. + * + * @param maxBatchSize the maximum size of a batch of entries that may be sent + * @param maxInFlightRequests he maximum number of in flight requests that may exist, if any + * more in flight requests need to be initiated once the maximum has been reached, then it + * will be blocked until some have completed + * @param maxBufferedRequests the maximum number of elements held in the buffer, requests to add + * elements will be blocked while the number of elements in the buffer is at the maximum + * @param maxBatchSizeInBytes the maximum size of a batch of entries that may be sent to KDS + * measured in bytes + * @param maxTimeInBufferMS the maximum amount of time an entry is allowed to live in the + * buffer, if any element reaches this age, the entire buffer will be flushed immediately + * @param maxRecordSizeInBytes the maximum size of a record the sink will accept into the + * buffer, a record of size larger than this will be rejected when passed to the sink + * @param elementConverter converting incoming records to Opensearch write document requests + * @param hosts the reachable Opensearch cluster nodes + * @param networkClientConfig describing properties of the network connection used to connect to + * the Opensearch cluster + */ + OpensearchAsyncSink( + int maxBatchSize, + int maxInFlightRequests, + int maxBufferedRequests, + long maxBatchSizeInBytes, + long maxTimeInBufferMS, + long maxRecordSizeInBytes, + ElementConverter> elementConverter, + List hosts, + NetworkClientConfig networkClientConfig) { + super( + elementConverter, + maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + maxBatchSizeInBytes, + maxTimeInBufferMS, + maxRecordSizeInBytes); + this.hosts = checkNotNull(hosts); + checkArgument(!hosts.isEmpty(), "Hosts cannot be empty."); + this.networkClientConfig = checkNotNull(networkClientConfig); + } + + /** + * Create a {@link OpensearchAsyncSinkBuilder} to construct a new {@link OpensearchAsyncSink}. + * + * @param type of incoming records + * @return {@link OpensearchAsyncSinkBuilder} + */ + public static OpensearchAsyncSinkBuilder builder() { + return new OpensearchAsyncSinkBuilder<>(); + } + + @Internal + @Override + public StatefulSinkWriter>> createWriter( + InitContext context) throws IOException { + return new OpensearchAsyncWriter<>( + context, + getElementConverter(), + getMaxBatchSize(), + getMaxInFlightRequests(), + getMaxBufferedRequests(), + getMaxBatchSizeInBytes(), + getMaxTimeInBufferMS(), + getMaxRecordSizeInBytes(), + hosts, + networkClientConfig, + Collections.emptyList()); + } + + @Internal + @Override + public StatefulSinkWriter>> restoreWriter( + InitContext context, + Collection>> recoveredState) + throws IOException { + return new OpensearchAsyncWriter<>( + context, + getElementConverter(), + getMaxBatchSize(), + getMaxInFlightRequests(), + getMaxBufferedRequests(), + getMaxBatchSizeInBytes(), + getMaxTimeInBufferMS(), + getMaxRecordSizeInBytes(), + hosts, + networkClientConfig, + recoveredState); + } + + @Internal + @Override + public SimpleVersionedSerializer>> + getWriterStateSerializer() { + return new OpensearchWriterStateSerializer(); + } +} diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilder.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilder.java new file mode 100644 index 0000000..e1bdd65 --- /dev/null +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilder.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.opensearch.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; +import org.apache.flink.connector.base.sink.writer.ElementConverter; + +import org.apache.http.HttpHost; +import org.opensearch.action.DocWriteRequest; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Builder to construct an Opensearch compatible {@link OpensearchAsyncSink}. + * + *

The following example shows the minimal setup to create a OpensearchAsyncSink that submits + * actions with the default number of actions to buffer (1000). + * + *

{@code
+ * OpensearchAsyncSink> sink = OpensearchAsyncSink
+ *     .>builder()
+ *     .setHosts(new HttpHost("localhost:9200")
+ *     .setElementConverter((element, context) ->
+ *         new IndexRequest("my-index").id(element.f0.toString()).source(element.f1));
+ *     .build();
+ * }
+ * + * @param type of the records converted to Opensearch actions + */ +@PublicEvolving +public class OpensearchAsyncSinkBuilder + extends AsyncSinkBaseBuilder< + InputT, DocSerdeRequest, OpensearchAsyncSinkBuilder> { + private List hosts; + private String username; + private String password; + private String connectionPathPrefix; + private Integer connectionTimeout; + private Integer connectionRequestTimeout; + private Integer socketTimeout; + private Boolean allowInsecure; + private ElementConverter> elementConverter; + + /** + * Sets the element converter. + * + * @param elementConverter element converter + */ + public OpensearchAsyncSinkBuilder setElementConverter( + ElementConverter> elementConverter) { + this.elementConverter = + (element, context) -> + DocSerdeRequest.from(elementConverter.apply(element, context)); + return this; + } + + /** + * Sets the hosts where the Opensearch cluster nodes are reachable. + * + * @param hosts http addresses describing the node locations + * @return this builder + */ + public OpensearchAsyncSinkBuilder setHosts(HttpHost... hosts) { + checkNotNull(hosts); + checkState(hosts.length > 0, "Hosts cannot be empty."); + this.hosts = Arrays.asList(hosts); + return this; + } + + /** + * Sets the username used to authenticate the connection with the Opensearch cluster. + * + * @param username of the Opensearch cluster user + * @return this builder + */ + public OpensearchAsyncSinkBuilder setConnectionUsername(String username) { + checkNotNull(username); + this.username = username; + return this; + } + + /** + * Sets the password used to authenticate the conection with the Opensearch cluster. + * + * @param password of the Opensearch cluster user + * @return this builder + */ + public OpensearchAsyncSinkBuilder setConnectionPassword(String password) { + checkNotNull(password); + this.password = password; + return this; + } + + /** + * Sets a prefix which used for every REST communication to the Opensearch cluster. + * + * @param prefix for the communication + * @return this builder + */ + public OpensearchAsyncSinkBuilder setConnectionPathPrefix(String prefix) { + checkNotNull(prefix); + this.connectionPathPrefix = prefix; + return this; + } + + /** + * Sets the timeout for requesting the connection of the Opensearch cluster from the connection + * manager. + * + * @param timeout for the connection request + * @return this builder + */ + public OpensearchAsyncSinkBuilder setConnectionRequestTimeout(int timeout) { + checkState(timeout >= 0, "Connection request timeout must be larger than or equal to 0."); + this.connectionRequestTimeout = timeout; + return this; + } + + /** + * Sets the timeout for establishing a connection of the Opensearch cluster. + * + * @param timeout for the connection + * @return this builder + */ + public OpensearchAsyncSinkBuilder setConnectionTimeout(int timeout) { + checkState(timeout >= 0, "Connection timeout must be larger than or equal to 0."); + this.connectionTimeout = timeout; + return this; + } + + /** + * Sets the timeout for waiting for data or, put differently, a maximum period inactivity + * between two consecutive data packets. + * + * @param timeout for the socket + * @return this builder + */ + public OpensearchAsyncSinkBuilder setSocketTimeout(int timeout) { + checkState(timeout >= 0, "Socket timeout must be larger than or equal to 0."); + this.socketTimeout = timeout; + return this; + } + + /** + * Allows to bypass the certificates chain validation and connect to insecure network endpoints + * (for example, servers which use self-signed certificates). + * + * @param allowInsecure allow or not to insecure network endpoints + * @return this builder + */ + public OpensearchAsyncSinkBuilder setAllowInsecure(boolean allowInsecure) { + this.allowInsecure = allowInsecure; + return this; + } + + @Override + public OpensearchAsyncSink build() { + checkArgument(!hosts.isEmpty(), "Hosts cannot be empty."); + + return new OpensearchAsyncSink( + nonNullOrDefault( + getMaxBatchSize(), + 1000), /* OpensearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION */ + nonNullOrDefault( + getMaxInFlightRequests(), 1), /* BulkProcessor::concurrentRequests */ + nonNullOrDefault(getMaxBufferedRequests(), 10000), + nonNullOrDefault( + getMaxBatchSizeInBytes(), + 2 * 1024 + * 1024), /* OpensearchConnectorOptions.BULK_FLUSH_MAX_SIZE_OPTION */ + nonNullOrDefault( + getMaxTimeInBufferMS(), + 1000), /* OpensearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION */ + nonNullOrDefault(getMaxRecordSizeInBytes(), 1 * 1024 * 1024), /* 1Mb */ + elementConverter, + hosts, + buildNetworkClientConfig()); + } + + private static int nonNullOrDefault(Integer value, int defaultValue) { + return (value != null) ? value : defaultValue; + } + + private static long nonNullOrDefault(Long value, long defaultValue) { + return (value != null) ? value : defaultValue; + } + + private NetworkClientConfig buildNetworkClientConfig() { + return new NetworkClientConfig( + username, + password, + connectionPathPrefix, + connectionRequestTimeout, + connectionTimeout, + socketTimeout, + allowInsecure); + } +} diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java new file mode 100644 index 0000000..d5dcbaa --- /dev/null +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.opensearch.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; + +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.conn.ssl.TrustAllStrategy; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.ssl.SSLContexts; +import org.opensearch.action.ActionListener; +import org.opensearch.action.bulk.BulkItemResponse; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.RestClient; +import org.opensearch.client.RestClientBuilder; +import org.opensearch.client.RestHighLevelClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Apache Flink's Async Sink Writer to insert or update data in an Opensearch index (see please + * {@link OpensearchAsyncSink}). + * + * @param type of the records converted to Opensearch actions (instances of {@link + * DocSerdeRequest}) + */ +@Internal +class OpensearchAsyncWriter extends AsyncSinkWriter> { + private static final Logger LOG = LoggerFactory.getLogger(OpensearchAsyncWriter.class); + + private final RestHighLevelClient client; + private final Counter numRecordsOutErrorsCounter; + private volatile boolean closed = false; + + /** + * Constructor creating an Opensearch async writer. + * + * @param context the initialization context + * @param elementConverter converting incoming records to Opensearch write document requests + * @param maxBatchSize the maximum size of a batch of entries that may be sent + * @param maxInFlightRequests he maximum number of in flight requests that may exist, if any + * more in flight requests need to be initiated once the maximum has been reached, then it + * will be blocked until some have completed + * @param maxBufferedRequests the maximum number of elements held in the buffer, requests to add + * elements will be blocked while the number of elements in the buffer is at the maximum + * @param maxBatchSizeInBytes the maximum size of a batch of entries that may be sent to KDS + * measured in bytes + * @param maxTimeInBufferMS the maximum amount of time an entry is allowed to live in the + * buffer, if any element reaches this age, the entire buffer will be flushed immediately + * @param maxRecordSizeInBytes the maximum size of a record the sink will accept into the + * buffer, a record of size larger than this will be rejected when passed to the sink + * @param hosts the reachable Opensearch cluster nodes + * @param networkClientConfig describing properties of the network connection used to connect to + * the Opensearch cluster + * @param initialStates the initial state of the sink + */ + OpensearchAsyncWriter( + Sink.InitContext context, + ElementConverter> elementConverter, + int maxBatchSize, + int maxInFlightRequests, + int maxBufferedRequests, + long maxBatchSizeInBytes, + long maxTimeInBufferMS, + long maxRecordSizeInBytes, + List hosts, + NetworkClientConfig networkClientConfig, + Collection>> initialStates) { + super( + elementConverter, + context, + AsyncSinkWriterConfiguration.builder() + .setMaxBatchSize(maxBatchSize) + .setMaxBatchSizeInBytes(maxBatchSizeInBytes) + .setMaxInFlightRequests(maxInFlightRequests) + .setMaxBufferedRequests(maxBufferedRequests) + .setMaxTimeInBufferMS(maxTimeInBufferMS) + .setMaxRecordSizeInBytes(maxRecordSizeInBytes) + .build(), + initialStates); + + this.client = + new RestHighLevelClient( + configureRestClientBuilder( + RestClient.builder(hosts.toArray(new HttpHost[0])), + networkClientConfig)); + + final SinkWriterMetricGroup metricGroup = context.metricGroup(); + checkNotNull(metricGroup); + + this.numRecordsOutErrorsCounter = metricGroup.getNumRecordsOutErrorsCounter(); + } + + @Override + protected void submitRequestEntries( + List> requestEntries, + Consumer>> requestResult) { + + BulkRequest bulkRequest = new BulkRequest(); + requestEntries.forEach(r -> bulkRequest.add(r.getRequest())); + + final CompletableFuture future = new CompletableFuture<>(); + client.bulkAsync( + bulkRequest, + RequestOptions.DEFAULT, + new ActionListener() { + @Override + public void onResponse(BulkResponse response) { + future.complete(response); + } + + @Override + public void onFailure(Exception e) { + future.completeExceptionally(e); + } + }); + + future.whenComplete( + (response, err) -> { + if (err != null) { + handleFullyFailedBulkRequest(err, requestEntries, requestResult); + } else if (response.hasFailures()) { + handlePartiallyFailedBulkRequests(response, requestEntries, requestResult); + } else { + requestResult.accept(Collections.emptyList()); + } + }); + } + + @Override + protected long getSizeInBytes(DocSerdeRequest requestEntry) { + return requestEntry.getRequest().ramBytesUsed(); + } + + @Override + public void close() { + if (!closed) { + closed = true; + + try { + client.close(); + } catch (final IOException ex) { + LOG.warn("Error while closing RestHighLevelClient instance", ex); + } + } + } + + private void handleFullyFailedBulkRequest( + Throwable err, + List> requestEntries, + Consumer>> requestResult) { + numRecordsOutErrorsCounter.inc(requestEntries.size()); + requestResult.accept(requestEntries); + } + + private void handlePartiallyFailedBulkRequests( + BulkResponse response, + List> requestEntries, + Consumer>> requestResult) { + + final List> failedRequestEntries = new ArrayList<>(); + final BulkItemResponse[] items = response.getItems(); + + for (int i = 0; i < items.length; i++) { + if (items[i].getFailure() != null) { + failedRequestEntries.add(DocSerdeRequest.from(requestEntries.get(i).getRequest())); + } + } + + numRecordsOutErrorsCounter.inc(failedRequestEntries.size()); + requestResult.accept(failedRequestEntries); + } + + private static RestClientBuilder configureRestClientBuilder( + RestClientBuilder builder, NetworkClientConfig networkClientConfig) { + if (networkClientConfig.getConnectionPathPrefix() != null) { + builder.setPathPrefix(networkClientConfig.getConnectionPathPrefix()); + } + + builder.setHttpClientConfigCallback( + httpClientBuilder -> { + if (networkClientConfig.getPassword() != null + && networkClientConfig.getUsername() != null) { + final CredentialsProvider credentialsProvider = + new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials( + networkClientConfig.getUsername(), + networkClientConfig.getPassword())); + + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + } + + if (networkClientConfig.isAllowInsecure().orElse(false)) { + try { + httpClientBuilder.setSSLContext( + SSLContexts.custom() + .loadTrustMaterial(new TrustAllStrategy()) + .build()); + } catch (final NoSuchAlgorithmException + | KeyStoreException + | KeyManagementException ex) { + throw new IllegalStateException( + "Unable to create custom SSL context", ex); + } + } + + return httpClientBuilder; + }); + if (networkClientConfig.getConnectionRequestTimeout() != null + || networkClientConfig.getConnectionTimeout() != null + || networkClientConfig.getSocketTimeout() != null) { + builder.setRequestConfigCallback( + requestConfigBuilder -> { + if (networkClientConfig.getConnectionRequestTimeout() != null) { + requestConfigBuilder.setConnectionRequestTimeout( + networkClientConfig.getConnectionRequestTimeout()); + } + if (networkClientConfig.getConnectionTimeout() != null) { + requestConfigBuilder.setConnectTimeout( + networkClientConfig.getConnectionTimeout()); + } + if (networkClientConfig.getSocketTimeout() != null) { + requestConfigBuilder.setSocketTimeout( + networkClientConfig.getSocketTimeout()); + } + return requestConfigBuilder; + }); + } + return builder; + } +} diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java index 895ca03..208ef37 100644 --- a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java @@ -38,7 +38,7 @@ * on checkpoint or the default number of actions was buffered (1000). * *
{@code
- * OpensearchSink sink = new OpensearchSinkBuilder()
+ * OpensearchSink> sink = new OpensearchSinkBuilder>()
  *     .setHosts(new HttpHost("localhost:9200")
  *     .setEmitter((element, context, indexer) -> {
  *          indexer.add(
diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterStateSerializer.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterStateSerializer.java
new file mode 100644
index 0000000..e41dcaa
--- /dev/null
+++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterStateSerializer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * Apache Flink's Async Sink write state for Opensearch document write requests (see please {@link
+ * DocWriteRequest}), wrapped into {@link DocSerdeRequest}.
+ */
+@Internal
+class OpensearchWriterStateSerializer extends AsyncSinkWriterStateSerializer> {
+    @Override
+    protected void serializeRequestToStream(DocSerdeRequest request, DataOutputStream out)
+            throws IOException {
+        request.writeTo(out);
+    }
+
+    @Override
+    protected DocSerdeRequest deserializeRequestFromStream(long requestSize, DataInputStream in)
+            throws IOException {
+        return DocSerdeRequest.readFrom(requestSize, in);
+    }
+
+    @Override
+    public int getVersion() {
+        return 1;
+    }
+}
diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilderTest.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilderTest.java
new file mode 100644
index 0000000..b36c077
--- /dev/null
+++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilderTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.apache.http.HttpHost;
+import org.junit.jupiter.api.DynamicTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestFactory;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link OpensearchAsyncSinkBuilder}. */
+@ExtendWith(TestLoggerExtension.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+class OpensearchAsyncSinkBuilderTest {
+
+    @TestFactory
+    Stream testValidBuilders() {
+        Stream> validBuilders =
+                Stream.of(
+                        createMinimalBuilder(),
+                        createMinimalBuilder()
+                                .setConnectionUsername("username")
+                                .setConnectionPassword("password"));
+
+        return DynamicTest.stream(
+                validBuilders,
+                OpensearchAsyncSinkBuilder::toString,
+                builder -> assertThatNoException().isThrownBy(builder::build));
+    }
+
+    @Test
+    void testThrowIfHostsNotSet() {
+        assertThatThrownBy(
+                        () ->
+                                createEmptyBuilder()
+                                        .setElementConverter((element, context) -> null)
+                                        .build())
+                .isInstanceOf(NullPointerException.class);
+    }
+
+    @Test
+    void testThrowIfElementConverterNotSet() {
+        assertThatThrownBy(
+                        () -> createEmptyBuilder().setHosts(new HttpHost("localhost:3000")).build())
+                .isInstanceOf(NullPointerException.class);
+    }
+
+    @Test
+    void testThrowIfSetInvalidTimeouts() {
+        assertThatThrownBy(() -> createEmptyBuilder().setConnectionRequestTimeout(-1).build())
+                .isInstanceOf(IllegalStateException.class);
+        assertThatThrownBy(() -> createEmptyBuilder().setConnectionTimeout(-1).build())
+                .isInstanceOf(IllegalStateException.class);
+        assertThatThrownBy(() -> createEmptyBuilder().setSocketTimeout(-1).build())
+                .isInstanceOf(IllegalStateException.class);
+    }
+
+    private OpensearchAsyncSinkBuilder createEmptyBuilder() {
+        return new OpensearchAsyncSinkBuilder<>();
+    }
+
+    private OpensearchAsyncSinkBuilder createMinimalBuilder() {
+        return new OpensearchAsyncSinkBuilder<>()
+                .setElementConverter((element, context) -> null)
+                .setHosts(new HttpHost("localhost:3000"));
+    }
+}
diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkITCase.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkITCase.java
new file mode 100644
index 0000000..1930553
--- /dev/null
+++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkITCase.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.opensearch.OpensearchUtil;
+import org.apache.flink.connector.opensearch.test.DockerImageVersions;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.apache.http.HttpHost;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.opensearch.action.DocWriteRequest;
+import org.opensearch.client.RestHighLevelClient;
+import org.opensearch.testcontainers.OpensearchContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.BiFunction;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link OpensearchAsyncSink}. */
+@Testcontainers
+@ExtendWith(TestLoggerExtension.class)
+class OpensearchAsyncSinkITCase {
+    protected static final Logger LOG = LoggerFactory.getLogger(OpensearchAsyncSinkITCase.class);
+    private static boolean failed;
+
+    private RestHighLevelClient client;
+    private OpensearchTestClient context;
+
+    @Container
+    private static final OpensearchContainer OS_CONTAINER =
+            OpensearchUtil.createOpensearchContainer(DockerImageVersions.OPENSEARCH_1, LOG);
+
+    @BeforeEach
+    void setUp() {
+        failed = false;
+        client = OpensearchUtil.createClient(OS_CONTAINER);
+        context = new OpensearchTestClient(client);
+    }
+
+    @AfterEach
+    void tearDown() throws IOException {
+        if (client != null) {
+            client.close();
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("opensearchConverters")
+    void testWriteJsonToOpensearch(
+            BiFunction<
+                            String,
+                            String,
+                            ElementConverter, DocWriteRequest>>
+                    converterProvider)
+            throws Exception {
+        final String index = "test-opensearch-async-sink-" + UUID.randomUUID();
+        runTest(index, false, converterProvider, null);
+    }
+
+    @Test
+    void testRecovery() throws Exception {
+        final String index = "test-recovery-opensearch-async-sink";
+        runTest(index, true, TestConverter::jsonConverter, new FailingMapper());
+        assertThat(failed).isTrue();
+    }
+
+    private void runTest(
+            String index,
+            boolean allowRestarts,
+            BiFunction<
+                            String,
+                            String,
+                            ElementConverter, DocWriteRequest>>
+                    converterProvider,
+            @Nullable MapFunction additionalMapper)
+            throws Exception {
+        final OpensearchAsyncSinkBuilder> builder =
+                OpensearchAsyncSink.>builder()
+                        .setHosts(HttpHost.create(OS_CONTAINER.getHttpHostAddress()))
+                        .setElementConverter(
+                                converterProvider.apply(index, context.getDataFieldName()))
+                        .setMaxBatchSize(5)
+                        .setConnectionUsername(OS_CONTAINER.getUsername())
+                        .setConnectionPassword(OS_CONTAINER.getPassword())
+                        .setAllowInsecure(true);
+
+        try (final StreamExecutionEnvironment env = new LocalStreamEnvironment()) {
+            env.enableCheckpointing(100L);
+            if (!allowRestarts) {
+                env.setRestartStrategy(RestartStrategies.noRestart());
+            }
+            DataStream stream = env.fromSequence(1, 5);
+
+            if (additionalMapper != null) {
+                stream = stream.map(additionalMapper);
+            }
+
+            stream.map(
+                            new MapFunction>() {
+                                @Override
+                                public Tuple2 map(Long value) throws Exception {
+                                    return Tuple2.of(
+                                            value.intValue(),
+                                            OpensearchTestClient.buildMessage(value.intValue()));
+                                }
+                            })
+                    .sinkTo(builder.build());
+            env.execute();
+            context.assertThatIdsAreWritten(index, 1, 2, 3, 4, 5);
+        }
+    }
+
+    private static List<
+                    BiFunction<
+                            String,
+                            String,
+                            ElementConverter, DocWriteRequest>>>
+            opensearchConverters() {
+        return Arrays.asList(TestConverter::jsonConverter, TestConverter::smileConverter);
+    }
+
+    private static class FailingMapper implements MapFunction, CheckpointListener {
+        private static final long serialVersionUID = 1L;
+        private int emittedRecords = 0;
+
+        @Override
+        public Long map(Long value) throws Exception {
+            Thread.sleep(50);
+            emittedRecords++;
+            return value;
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) throws Exception {
+            if (failed || emittedRecords == 0) {
+                return;
+            }
+            failed = true;
+            throw new Exception("Expected failure");
+        }
+    }
+}
diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriterITCase.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriterITCase.java
new file mode 100644
index 0000000..313f85e
--- /dev/null
+++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriterITCase.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter.Context;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
+import org.apache.flink.connector.opensearch.OpensearchUtil;
+import org.apache.flink.connector.opensearch.test.DockerImageVersions;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.apache.http.HttpHost;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.opensearch.action.delete.DeleteRequest;
+import org.opensearch.action.index.IndexRequest;
+import org.opensearch.action.update.UpdateRequest;
+import org.opensearch.client.RestHighLevelClient;
+import org.opensearch.testcontainers.OpensearchContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+import static org.apache.flink.connector.opensearch.sink.OpensearchTestClient.buildMessage;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link OpensearchAsyncWriter}. */
+@Testcontainers
+@ExtendWith(TestLoggerExtension.class)
+class OpensearchAsyncWriterITCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(OpensearchAsyncWriterITCase.class);
+
+    @Container
+    private static final OpensearchContainer OS_CONTAINER =
+            OpensearchUtil.createOpensearchContainer(DockerImageVersions.OPENSEARCH_1, LOG);
+
+    private RestHighLevelClient client;
+    private OpensearchTestClient clientContext;
+    private TestSinkInitContext context;
+
+    private final Lock lock = new ReentrantLock();
+    private final Condition completed = lock.newCondition();
+    private final List> requests = new ArrayList<>();
+
+    @BeforeEach
+    void setUp() {
+        client = OpensearchUtil.createClient(OS_CONTAINER);
+        clientContext = new OpensearchTestClient(client);
+        context = new TestSinkInitContext();
+        requests.clear();
+    }
+
+    @AfterEach
+    void tearDown() throws IOException {
+        if (client != null) {
+            client.close();
+        }
+    }
+
+    @Test
+    @Timeout(5)
+    void testWriteOnBulkFlush() throws Exception {
+        final String index = "test-bulk-flush-without-checkpoint-async";
+        final int maxBatchSize = 5;
+
+        try (final OpensearchAsyncWriter> writer =
+                createWriter(context, index, maxBatchSize, Long.MAX_VALUE)) {
+            writer.write(Tuple2.of(1, buildMessage(1)), null);
+            writer.write(Tuple2.of(2, buildMessage(2)), null);
+            writer.write(Tuple2.of(3, buildMessage(3)), null);
+            writer.write(Tuple2.of(4, buildMessage(4)), null);
+
+            // Ignore flush on checkpoint
+            writer.flush(false);
+            clientContext.assertThatIdsAreNotWritten(index, 1, 2, 3, 4);
+
+            // Trigger flush
+            writer.write(Tuple2.of(5, "test-5"), null);
+
+            /* await for async bulk request to complete */
+            awaitForCompletion();
+
+            clientContext.assertThatIdsAreWritten(index, 1, 2, 3, 4, 5);
+
+            writer.write(Tuple2.of(6, "test-6"), null);
+            clientContext.assertThatIdsAreNotWritten(index, 6);
+
+            // Force flush
+            writer.flush(true);
+            clientContext.assertThatIdsAreWritten(index, 1, 2, 3, 4, 5, 6);
+        }
+    }
+
+    @Test
+    @Timeout(5)
+    void testWriteOnBulkIntervalFlush() throws Exception {
+        final String index = "test-bulk-flush-with-interval-async";
+
+        try (final OpensearchAsyncWriter> writer =
+                createWriter(context, index, 10, 1000 /* 1s */)) {
+
+            writer.write(Tuple2.of(1, buildMessage(1)), null);
+            writer.write(Tuple2.of(2, buildMessage(2)), null);
+            writer.write(Tuple2.of(3, buildMessage(3)), null);
+            writer.write(Tuple2.of(4, buildMessage(4)), null);
+
+            /* advance timer */
+            context.getTestProcessingTimeService().advance(1200);
+
+            /* await for async bulk request to complete */
+            awaitForCompletion();
+        }
+
+        clientContext.assertThatIdsAreWritten(index, 1, 2, 3, 4);
+    }
+
+    @Test
+    void testIncrementByteOutMetric() throws Exception {
+        final String index = "test-inc-byte-out-async";
+        final int flushAfterNActions = 2;
+
+        try (final OpensearchAsyncWriter> writer =
+                createWriter(context, index, flushAfterNActions, Long.MAX_VALUE)) {
+            final Counter numBytesOut = context.getNumBytesOutCounter();
+            assertThat(numBytesOut.getCount()).isEqualTo(0);
+            writer.write(Tuple2.of(1, buildMessage(1)), null);
+            writer.write(Tuple2.of(2, buildMessage(2)), null);
+
+            writer.flush(true);
+            long first = numBytesOut.getCount();
+
+            assertThat(first).isGreaterThan(0);
+
+            writer.write(Tuple2.of(1, buildMessage(1)), null);
+            writer.write(Tuple2.of(2, buildMessage(2)), null);
+
+            writer.flush(true);
+            assertThat(numBytesOut.getCount()).isGreaterThan(first);
+        }
+    }
+
+    @Test
+    void testIncrementRecordsSendMetric() throws Exception {
+        final String index = "test-inc-records-send-async";
+        final int flushAfterNActions = 2;
+
+        try (final OpensearchAsyncWriter> writer =
+                createWriter(context, index, flushAfterNActions, Long.MAX_VALUE)) {
+            final Counter recordsSend = context.getNumRecordsOutCounter();
+
+            writer.write(Tuple2.of(1, buildMessage(1)), null);
+            // Update existing index
+            writer.write(Tuple2.of(1, "u" + buildMessage(2)), null);
+            // Delete index
+            writer.write(Tuple2.of(1, "d" + buildMessage(3)), null);
+
+            writer.flush(true);
+
+            assertThat(recordsSend.getCount()).isEqualTo(3L);
+        }
+    }
+
+    @Test
+    void testCurrentSendTime() throws Exception {
+        final String index = "test-current-send-time-async";
+        final int flushAfterNActions = 2;
+
+        try (final OpensearchAsyncWriter> writer =
+                createWriter(context, index, flushAfterNActions, Long.MAX_VALUE)) {
+            final Optional> currentSendTime = context.getCurrentSendTimeGauge();
+
+            writer.write(Tuple2.of(1, buildMessage(1)), null);
+            writer.write(Tuple2.of(2, buildMessage(2)), null);
+
+            writer.flush(true);
+
+            assertThat(currentSendTime).isPresent();
+            assertThat(currentSendTime.get().getValue()).isGreaterThan(0L);
+        }
+    }
+
+    @Test
+    @Timeout(5)
+    void testWriteError() throws Exception {
+        final String index = "test-bulk-flush-error-async";
+        final int maxBatchSize = 5;
+
+        try (final OpensearchAsyncWriter> writer =
+                createWriter(context, index, maxBatchSize, Long.MAX_VALUE)) {
+            writer.write(Tuple2.of(1, buildMessage(1)), null);
+
+            // Force flush
+            writer.flush(true);
+            clientContext.assertThatIdsAreWritten(index, 1);
+            assertThat(requests).hasSize(0);
+
+            // The "c" prefix should force the create mode and fail the bulk item request
+            // (duplicate)
+            writer.write(Tuple2.of(1, buildMessage(1, "c")), null);
+            writer.write(Tuple2.of(2, buildMessage(2)), null);
+            writer.write(Tuple2.of(3, buildMessage(3)), null);
+            writer.write(Tuple2.of(4, buildMessage(4)), null);
+            writer.write(Tuple2.of(5, buildMessage(5)), null);
+
+            /* await for async bulk request to complete */
+            awaitForCompletion();
+
+            // Force flush
+            clientContext.assertThatIdsAreWritten(index, 1, 2, 3, 4, 5);
+            assertThat(requests).hasSize(1);
+        }
+    }
+
+    private OpensearchAsyncWriter> createWriter(
+            Sink.InitContext context, String index, int maxBatchSize, long maxTimeInBufferMS) {
+        return new OpensearchAsyncWriter>(
+                context,
+                new UpdatingElementConverter(index, clientContext.getDataFieldName()),
+                maxBatchSize,
+                1,
+                100,
+                Long.MAX_VALUE,
+                maxTimeInBufferMS,
+                Long.MAX_VALUE,
+                Collections.singletonList(HttpHost.create(OS_CONTAINER.getHttpHostAddress())),
+                new NetworkClientConfig(
+                        OS_CONTAINER.getUsername(),
+                        OS_CONTAINER.getPassword(),
+                        null,
+                        null,
+                        null,
+                        null,
+                        true),
+                Collections.emptyList()) {
+            @Override
+            protected void submitRequestEntries(
+                    List> requestEntries,
+                    Consumer>> requestResult) {
+                super.submitRequestEntries(
+                        requestEntries,
+                        (entries) -> {
+                            requestResult.accept(entries);
+
+                            lock.lock();
+                            try {
+                                requests.addAll(entries);
+                                completed.signal();
+                            } finally {
+                                lock.unlock();
+                            }
+                        });
+            }
+        };
+    }
+
+    private static class UpdatingElementConverter
+            implements ElementConverter, DocSerdeRequest> {
+        private static final long serialVersionUID = 1L;
+
+        private final String dataFieldName;
+        private final String index;
+
+        UpdatingElementConverter(String index, String dataFieldName) {
+            this.index = index;
+            this.dataFieldName = dataFieldName;
+        }
+
+        @Override
+        public DocSerdeRequest apply(Tuple2 element, Context context) {
+            Map document = new HashMap<>();
+            document.put(dataFieldName, element.f1);
+
+            final char action = element.f1.charAt(0);
+            final String id = element.f0.toString();
+            switch (action) {
+                case 'c':
+                    return DocSerdeRequest.from(
+                            new IndexRequest(index).create(true).id(id).source(document));
+                case 'd':
+                    return DocSerdeRequest.from(new DeleteRequest(index).id(id));
+                case 'u':
+                    return DocSerdeRequest.from(
+                            new UpdateRequest().index(index).id(id).doc(document));
+                default:
+                    return DocSerdeRequest.from(new IndexRequest(index).id(id).source(document));
+            }
+        }
+    }
+
+    private void awaitForCompletion() throws InterruptedException {
+        lock.lock();
+        try {
+            completed.await();
+        } finally {
+            lock.unlock();
+        }
+    }
+}
diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkITCase.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkITCase.java
index c85e42b..482e2fe 100644
--- a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkITCase.java
+++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkITCase.java
@@ -146,29 +146,30 @@ private void runTest(
                         .setAllowInsecure(true)
                         .build();
 
-        final StreamExecutionEnvironment env = new LocalStreamEnvironment();
-        env.enableCheckpointing(100L);
-        if (!allowRestarts) {
-            env.setRestartStrategy(RestartStrategies.noRestart());
-        }
-        DataStream stream = env.fromSequence(1, 5);
+        try (final StreamExecutionEnvironment env = new LocalStreamEnvironment()) {
+            env.enableCheckpointing(100L);
+            if (!allowRestarts) {
+                env.setRestartStrategy(RestartStrategies.noRestart());
+            }
+            DataStream stream = env.fromSequence(1, 5);
 
-        if (additionalMapper != null) {
-            stream = stream.map(additionalMapper);
-        }
+            if (additionalMapper != null) {
+                stream = stream.map(additionalMapper);
+            }
 
-        stream.map(
-                        new MapFunction>() {
-                            @Override
-                            public Tuple2 map(Long value) throws Exception {
-                                return Tuple2.of(
-                                        value.intValue(),
-                                        OpensearchTestClient.buildMessage(value.intValue()));
-                            }
-                        })
-                .sinkTo(sink);
-        env.execute();
-        context.assertThatIdsAreWritten(index, 1, 2, 3, 4, 5);
+            stream.map(
+                            new MapFunction>() {
+                                @Override
+                                public Tuple2 map(Long value) throws Exception {
+                                    return Tuple2.of(
+                                            value.intValue(),
+                                            OpensearchTestClient.buildMessage(value.intValue()));
+                                }
+                            })
+                    .sinkTo(sink);
+            env.execute();
+            context.assertThatIdsAreWritten(index, 1, 2, 3, 4, 5);
+        }
     }
 
     private static List>>>
diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchTestClient.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchTestClient.java
index 322ffc1..b0d42a7 100644
--- a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchTestClient.java
+++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchTestClient.java
@@ -68,6 +68,10 @@ String getDataFieldName() {
         return DATA_FIELD_NAME;
     }
 
+    static String buildMessage(int id, String prefix) {
+        return prefix + buildMessage(id);
+    }
+
     static String buildMessage(int id) {
         return "test-" + id;
     }
diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/TestConverter.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/TestConverter.java
new file mode 100644
index 0000000..9883b03
--- /dev/null
+++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/TestConverter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.api.connector.sink2.SinkWriter.Context;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import org.opensearch.action.DocWriteRequest;
+import org.opensearch.action.index.IndexRequest;
+import org.opensearch.common.xcontent.XContentBuilder;
+import org.opensearch.common.xcontent.XContentFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+class TestConverter implements ElementConverter, DocWriteRequest> {
+
+    private final String index;
+    private final XContentBuilderProvider xContentBuilderProvider;
+    private final String dataFieldName;
+
+    public static TestConverter jsonConverter(String index, String dataFieldName) {
+        return new TestConverter(index, dataFieldName, XContentFactory::jsonBuilder);
+    }
+
+    public static TestConverter smileConverter(String index, String dataFieldName) {
+        return new TestConverter(index, dataFieldName, XContentFactory::smileBuilder);
+    }
+
+    private TestConverter(
+            String index, String dataFieldName, XContentBuilderProvider xContentBuilderProvider) {
+        this.dataFieldName = dataFieldName;
+        this.index = index;
+        this.xContentBuilderProvider = xContentBuilderProvider;
+    }
+
+    @Override
+    public DocWriteRequest apply(Tuple2 element, Context context) {
+        return createIndexRequest(element);
+    }
+
+    public IndexRequest createIndexRequest(Tuple2 element) {
+        Map document = new HashMap<>();
+        document.put(dataFieldName, element.f1);
+        try {
+            return new IndexRequest(index)
+                    .id(element.f0.toString())
+                    .source(xContentBuilderProvider.getBuilder().map(document));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @FunctionalInterface
+    private interface XContentBuilderProvider extends Serializable {
+        XContentBuilder getBuilder() throws IOException;
+    }
+}

From cf0992fd1aad06e989d28fcfb7a22cedd9295ddb Mon Sep 17 00:00:00 2001
From: Andriy Redko 
Date: Wed, 8 Feb 2023 09:13:15 -0500
Subject: [PATCH 2/8] Addressing code review comments

Signed-off-by: Andriy Redko 
---
 .../tests/OpensearchAsyncSinkExample.java     |  8 +---
 .../opensearch/sink/OpensearchAsyncSink.java  |  6 +--
 .../sink/OpensearchAsyncSinkBuilder.java      | 45 +++++++++++++++----
 .../sink/OpensearchAsyncSinkBuilderTest.java  |  6 +--
 4 files changed, 43 insertions(+), 22 deletions(-)

diff --git a/flink-connector-opensearch-e2e-tests/src/main/java/org/apache/flink/streaming/tests/OpensearchAsyncSinkExample.java b/flink-connector-opensearch-e2e-tests/src/main/java/org/apache/flink/streaming/tests/OpensearchAsyncSinkExample.java
index 065c3a8..111155c 100644
--- a/flink-connector-opensearch-e2e-tests/src/main/java/org/apache/flink/streaming/tests/OpensearchAsyncSinkExample.java
+++ b/flink-connector-opensearch-e2e-tests/src/main/java/org/apache/flink/streaming/tests/OpensearchAsyncSinkExample.java
@@ -29,9 +29,6 @@
 import org.apache.http.HttpHost;
 import org.opensearch.action.index.IndexRequest;
 
-import java.util.ArrayList;
-import java.util.List;
-
 /** End to end test for OpensearchAsyncSink. */
 public class OpensearchAsyncSinkExample {
 
@@ -62,12 +59,9 @@ public void flatMap(
                                     }
                                 });
 
-        List httpHosts = new ArrayList<>();
-        httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
-
         OpensearchAsyncSinkBuilder> osSinkBuilder =
                 OpensearchAsyncSink.>builder()
-                        .setHosts(new HttpHost("localhost:9200"))
+                        .setHosts(new HttpHost("localhost", 9200, "http"))
                         .setElementConverter(
                                 (element, context) ->
                                         new IndexRequest("my-index")
diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSink.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSink.java
index f62e55b..6e062b3 100644
--- a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSink.java
+++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSink.java
@@ -54,13 +54,13 @@ public class OpensearchAsyncSink extends AsyncSinkBase setElementConverter(
      */
     public OpensearchAsyncSinkBuilder setHosts(HttpHost... hosts) {
         checkNotNull(hosts);
-        checkState(hosts.length > 0, "Hosts cannot be empty.");
+        checkArgument(hosts.length > 0, "Hosts cannot be empty.");
         this.hosts = Arrays.asList(hosts);
         return this;
     }
@@ -97,6 +98,7 @@ public OpensearchAsyncSinkBuilder setHosts(HttpHost... hosts) {
      */
     public OpensearchAsyncSinkBuilder setConnectionUsername(String username) {
         checkNotNull(username);
+        checkArgument(!username.trim().isEmpty(), "Username cannot be empty");
         this.username = username;
         return this;
     }
@@ -109,6 +111,7 @@ public OpensearchAsyncSinkBuilder setConnectionUsername(String username)
      */
     public OpensearchAsyncSinkBuilder setConnectionPassword(String password) {
         checkNotNull(password);
+        checkArgument(!password.trim().isEmpty(), "Password cannot be empty");
         this.password = password;
         return this;
     }
@@ -129,11 +132,12 @@ public OpensearchAsyncSinkBuilder setConnectionPathPrefix(String prefix)
      * Sets the timeout for requesting the connection of the Opensearch cluster from the connection
      * manager.
      *
-     * @param timeout for the connection request
+     * @param timeout timeout for the connection request (in milliseconds)
      * @return this builder
      */
     public OpensearchAsyncSinkBuilder setConnectionRequestTimeout(int timeout) {
-        checkState(timeout >= 0, "Connection request timeout must be larger than or equal to 0.");
+        checkArgument(
+                timeout >= 0, "Connection request timeout must be larger than or equal to 0.");
         this.connectionRequestTimeout = timeout;
         return this;
     }
@@ -141,28 +145,53 @@ public OpensearchAsyncSinkBuilder setConnectionRequestTimeout(int timeou
     /**
      * Sets the timeout for establishing a connection of the Opensearch cluster.
      *
-     * @param timeout for the connection
+     * @param timeout timeout for the connection (in milliseconds)
      * @return this builder
      */
     public OpensearchAsyncSinkBuilder setConnectionTimeout(int timeout) {
-        checkState(timeout >= 0, "Connection timeout must be larger than or equal to 0.");
+        checkArgument(timeout >= 0, "Connection timeout must be larger than or equal to 0.");
         this.connectionTimeout = timeout;
         return this;
     }
 
+    /**
+     * Sets the timeout for establishing a connection of the Opensearch cluster.
+     *
+     * @param timeout timeout for the connection (in milliseconds)
+     * @param timeUnit timeout time unit
+     * @return this builder
+     */
+    public OpensearchAsyncSinkBuilder setConnectionTimeout(int timeout, TimeUnit timeUnit) {
+        checkNotNull(timeUnit, "TimeUnit cannot be null.");
+        return setConnectionTimeout((int) timeUnit.toMillis(timeout));
+    }
+
     /**
      * Sets the timeout for waiting for data or, put differently, a maximum period inactivity
      * between two consecutive data packets.
      *
-     * @param timeout for the socket
+     * @param timeout timeout for the socket (in milliseconds)
      * @return this builder
      */
     public OpensearchAsyncSinkBuilder setSocketTimeout(int timeout) {
-        checkState(timeout >= 0, "Socket timeout must be larger than or equal to 0.");
+        checkArgument(timeout >= 0, "Socket timeout must be larger than or equal to 0.");
         this.socketTimeout = timeout;
         return this;
     }
 
+    /**
+     * Sets the timeout for waiting for data or, put differently, a maximum period inactivity
+     * between two consecutive data packets.
+     *
+     * @param timeout timeout for the socket
+     * @param timeUnit timeout time unit
+     * @return this builder
+     */
+    public OpensearchAsyncSinkBuilder setSocketTimeout(int timeout, TimeUnit timeUnit) {
+        checkNotNull(timeUnit, "TimeUnit cannot be null.");
+        return setSocketTimeout((int) timeUnit.toMillis(timeout));
+    }
+
     /**
      * Allows to bypass the certificates chain validation and connect to insecure network endpoints
      * (for example, servers which use self-signed certificates).
@@ -177,8 +206,6 @@ public OpensearchAsyncSinkBuilder setAllowInsecure(boolean allowInsecure
 
     @Override
     public OpensearchAsyncSink build() {
-        checkArgument(!hosts.isEmpty(), "Hosts cannot be empty.");
-
         return new OpensearchAsyncSink(
                 nonNullOrDefault(
                         getMaxBatchSize(),
diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilderTest.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilderTest.java
index b36c077..bd0cdb8 100644
--- a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilderTest.java
+++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilderTest.java
@@ -71,11 +71,11 @@ void testThrowIfElementConverterNotSet() {
     @Test
     void testThrowIfSetInvalidTimeouts() {
         assertThatThrownBy(() -> createEmptyBuilder().setConnectionRequestTimeout(-1).build())
-                .isInstanceOf(IllegalStateException.class);
+                .isInstanceOf(IllegalArgumentException.class);
         assertThatThrownBy(() -> createEmptyBuilder().setConnectionTimeout(-1).build())
-                .isInstanceOf(IllegalStateException.class);
+                .isInstanceOf(IllegalArgumentException.class);
         assertThatThrownBy(() -> createEmptyBuilder().setSocketTimeout(-1).build())
-                .isInstanceOf(IllegalStateException.class);
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
     private OpensearchAsyncSinkBuilder createEmptyBuilder() {

From 7213a0c0a61de82b141b1ffc7dfddcdc49fd0a29 Mon Sep 17 00:00:00 2001
From: Andriy Redko 
Date: Wed, 8 Feb 2023 09:20:14 -0500
Subject: [PATCH 3/8] Update Apache Flink to 1.16.1 (1.16.0 artifacts are not
 available anymore)

Signed-off-by: Andriy Redko 
---
 .../connector/opensearch/sink/OpensearchAsyncSinkBuilder.java    | 1 -
 1 file changed, 1 deletion(-)

diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilder.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilder.java
index 00143e4..adbff7f 100644
--- a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilder.java
+++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilder.java
@@ -31,7 +31,6 @@
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Builder to construct an Opensearch compatible {@link OpensearchAsyncSink}.

From 5eb2fa4965d77f260c76933977dc10f7066822aa Mon Sep 17 00:00:00 2001
From: Andriy Redko 
Date: Wed, 8 Feb 2023 09:54:37 -0500
Subject: [PATCH 4/8] Addressing code review comments

Signed-off-by: Andriy Redko 
---
 .../sink/OpensearchAsyncWriter.java           | 36 ++++++++++++++++++-
 1 file changed, 35 insertions(+), 1 deletion(-)

diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java
index d5dcbaa..2bbbc17 100644
--- a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java
+++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
 import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
 import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
@@ -34,6 +35,7 @@
 import org.apache.http.conn.ssl.TrustAllStrategy;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.ssl.SSLContexts;
+import org.opensearch.OpenSearchException;
 import org.opensearch.action.ActionListener;
 import org.opensearch.action.bulk.BulkItemResponse;
 import org.opensearch.action.bulk.BulkRequest;
@@ -46,6 +48,8 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.ConnectException;
+import java.net.NoRouteToHostException;
 import java.security.KeyManagementException;
 import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
@@ -73,6 +77,17 @@ class OpensearchAsyncWriter extends AsyncSinkWriter
+                                    err instanceof NoRouteToHostException
+                                            || err instanceof ConnectException,
+                            err ->
+                                    new OpenSearchException(
+                                            "Could not connect to Opensearch cluster using provided hosts",
+                                            err)));
+
     /**
      * Constructor creating an Opensearch async writer.
      *
@@ -186,12 +201,31 @@ public void close() {
         }
     }
 
+    private boolean isRetryable(Throwable err) {
+        // isFatal() is really isNotFatal()
+        if (!OPENSEARCH_FATAL_EXCEPTION_CLASSIFIER.isFatal(err, getFatalExceptionCons())) {
+            return false;
+        }
+        return true;
+    }
+
     private void handleFullyFailedBulkRequest(
             Throwable err,
             List> requestEntries,
             Consumer>> requestResult) {
+        final boolean retryable = isRetryable(err.getCause());
+
+        LOG.warn(
+                "Opensearch AsyncWwriter failed to persist {} entries (retryable = {})",
+                requestEntries.size(),
+                retryable,
+                err);
+
         numRecordsOutErrorsCounter.inc(requestEntries.size());
-        requestResult.accept(requestEntries);
+
+        if (retryable) {
+            requestResult.accept(requestEntries);
+        }
     }
 
     private void handlePartiallyFailedBulkRequests(

From 418c0e0602283455ef9162ea269acd9b3ad3d7dd Mon Sep 17 00:00:00 2001
From: Andriy Redko 
Date: Fri, 10 Feb 2023 13:35:45 -0500
Subject: [PATCH 5/8] Addressing code review comments

Signed-off-by: Andriy Redko 
---
 .github/workflows/push_pr.yml                 |  2 +-
 .../opensearch/sink/DocSerdeRequest.java      | 18 +++++++---------
 .../opensearch/sink/OpensearchAsyncSink.java  | 13 ++++++------
 .../sink/OpensearchAsyncSinkBuilder.java      |  5 ++---
 .../sink/OpensearchAsyncWriter.java           | 21 +++++++++----------
 .../sink/OpensearchWriterStateSerializer.java |  6 +++---
 .../sink/OpensearchAsyncWriterITCase.java     | 10 ++++-----
 pom.xml                                       |  2 +-
 8 files changed, 36 insertions(+), 41 deletions(-)

diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index e3a7f8b..e1ba2d4 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -25,4 +25,4 @@ jobs:
   compile_and_test:
     uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
     with:
-      flink_version: 1.16.1
+      flink_version: 1.16.0
diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DocSerdeRequest.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DocSerdeRequest.java
index 2edd97f..c01949f 100644
--- a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DocSerdeRequest.java
+++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DocSerdeRequest.java
@@ -38,29 +38,27 @@
 /**
  * Wrapper class around {@link DocWriteRequest} since it does not implement {@link Serializable},
  * required by AsyncSink scaffolding.
- *
- * @param  type of the write request
  */
 @PublicEvolving
-public class DocSerdeRequest implements Serializable {
+public class DocSerdeRequest implements Serializable {
     private static final long serialVersionUID = 1L;
-    private final DocWriteRequest request;
+    private final DocWriteRequest request;
 
-    private DocSerdeRequest(DocWriteRequest request) {
+    private DocSerdeRequest(DocWriteRequest request) {
         this.request = request;
     }
 
-    public DocWriteRequest getRequest() {
+    public DocWriteRequest getRequest() {
         return request;
     }
 
-    static  DocSerdeRequest from(DocWriteRequest request) {
-        return new DocSerdeRequest<>(request);
+    static  DocSerdeRequest from(DocWriteRequest request) {
+        return new DocSerdeRequest(request);
     }
 
-    static DocSerdeRequest readFrom(long requestSize, DataInputStream in) throws IOException {
+    static DocSerdeRequest readFrom(long requestSize, DataInputStream in) throws IOException {
         try (final StreamInput stream = new InputStreamStreamInput(in, requestSize)) {
-            return new DocSerdeRequest<>(readDocumentRequest(stream));
+            return new DocSerdeRequest(readDocumentRequest(stream));
         }
     }
 
diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSink.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSink.java
index 6e062b3..03ff4e1 100644
--- a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSink.java
+++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSink.java
@@ -44,7 +44,7 @@
  * @see OpensearchAsyncSinkBuilder on how to construct a OpensearchAsyncSink
  */
 @PublicEvolving
-public class OpensearchAsyncSink extends AsyncSinkBase> {
+public class OpensearchAsyncSink extends AsyncSinkBase {
     private static final long serialVersionUID = 1L;
 
     private final List hosts;
@@ -77,7 +77,7 @@ public class OpensearchAsyncSink extends AsyncSinkBase> elementConverter,
+            ElementConverter elementConverter,
             List hosts,
             NetworkClientConfig networkClientConfig) {
         super(
@@ -105,7 +105,7 @@ public static  OpensearchAsyncSinkBuilder builder() {
 
     @Internal
     @Override
-    public StatefulSinkWriter>> createWriter(
+    public StatefulSinkWriter> createWriter(
             InitContext context) throws IOException {
         return new OpensearchAsyncWriter<>(
                 context,
@@ -123,9 +123,8 @@ public StatefulSinkWriter>> crea
 
     @Internal
     @Override
-    public StatefulSinkWriter>> restoreWriter(
-            InitContext context,
-            Collection>> recoveredState)
+    public StatefulSinkWriter> restoreWriter(
+            InitContext context, Collection> recoveredState)
             throws IOException {
         return new OpensearchAsyncWriter<>(
                 context,
@@ -143,7 +142,7 @@ public StatefulSinkWriter>> rest
 
     @Internal
     @Override
-    public SimpleVersionedSerializer>>
+    public SimpleVersionedSerializer>
             getWriterStateSerializer() {
         return new OpensearchWriterStateSerializer();
     }
diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilder.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilder.java
index adbff7f..f6bb89e 100644
--- a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilder.java
+++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilder.java
@@ -51,8 +51,7 @@
  */
 @PublicEvolving
 public class OpensearchAsyncSinkBuilder
-        extends AsyncSinkBaseBuilder<
-                InputT, DocSerdeRequest, OpensearchAsyncSinkBuilder> {
+        extends AsyncSinkBaseBuilder> {
     private List hosts;
     private String username;
     private String password;
@@ -61,7 +60,7 @@ public class OpensearchAsyncSinkBuilder
     private Integer connectionRequestTimeout;
     private Integer socketTimeout;
     private Boolean allowInsecure;
-    private ElementConverter> elementConverter;
+    private ElementConverter elementConverter;
 
     /**
      * Sets the element converter.
diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java
index 2bbbc17..7f17c68 100644
--- a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java
+++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java
@@ -70,7 +70,7 @@
  *     DocSerdeRequest})
  */
 @Internal
-class OpensearchAsyncWriter extends AsyncSinkWriter> {
+class OpensearchAsyncWriter extends AsyncSinkWriter {
     private static final Logger LOG = LoggerFactory.getLogger(OpensearchAsyncWriter.class);
 
     private final RestHighLevelClient client;
@@ -112,7 +112,7 @@ class OpensearchAsyncWriter extends AsyncSinkWriter> elementConverter,
+            ElementConverter elementConverter,
             int maxBatchSize,
             int maxInFlightRequests,
             int maxBufferedRequests,
@@ -121,7 +121,7 @@ class OpensearchAsyncWriter extends AsyncSinkWriter hosts,
             NetworkClientConfig networkClientConfig,
-            Collection>> initialStates) {
+            Collection> initialStates) {
         super(
                 elementConverter,
                 context,
@@ -149,8 +149,7 @@ class OpensearchAsyncWriter extends AsyncSinkWriter> requestEntries,
-            Consumer>> requestResult) {
+            List requestEntries, Consumer> requestResult) {
 
         BulkRequest bulkRequest = new BulkRequest();
         requestEntries.forEach(r -> bulkRequest.add(r.getRequest()));
@@ -184,7 +183,7 @@ public void onFailure(Exception e) {
     }
 
     @Override
-    protected long getSizeInBytes(DocSerdeRequest requestEntry) {
+    protected long getSizeInBytes(DocSerdeRequest requestEntry) {
         return requestEntry.getRequest().ramBytesUsed();
     }
 
@@ -211,8 +210,8 @@ private boolean isRetryable(Throwable err) {
 
     private void handleFullyFailedBulkRequest(
             Throwable err,
-            List> requestEntries,
-            Consumer>> requestResult) {
+            List requestEntries,
+            Consumer> requestResult) {
         final boolean retryable = isRetryable(err.getCause());
 
         LOG.warn(
@@ -230,10 +229,10 @@ private void handleFullyFailedBulkRequest(
 
     private void handlePartiallyFailedBulkRequests(
             BulkResponse response,
-            List> requestEntries,
-            Consumer>> requestResult) {
+            List requestEntries,
+            Consumer> requestResult) {
 
-        final List> failedRequestEntries = new ArrayList<>();
+        final List failedRequestEntries = new ArrayList<>();
         final BulkItemResponse[] items = response.getItems();
 
         for (int i = 0; i < items.length; i++) {
diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterStateSerializer.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterStateSerializer.java
index e41dcaa..381f872 100644
--- a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterStateSerializer.java
+++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterStateSerializer.java
@@ -30,15 +30,15 @@
  * DocWriteRequest}), wrapped into {@link DocSerdeRequest}.
  */
 @Internal
-class OpensearchWriterStateSerializer extends AsyncSinkWriterStateSerializer> {
+class OpensearchWriterStateSerializer extends AsyncSinkWriterStateSerializer {
     @Override
-    protected void serializeRequestToStream(DocSerdeRequest request, DataOutputStream out)
+    protected void serializeRequestToStream(DocSerdeRequest request, DataOutputStream out)
             throws IOException {
         request.writeTo(out);
     }
 
     @Override
-    protected DocSerdeRequest deserializeRequestFromStream(long requestSize, DataInputStream in)
+    protected DocSerdeRequest deserializeRequestFromStream(long requestSize, DataInputStream in)
             throws IOException {
         return DocSerdeRequest.readFrom(requestSize, in);
     }
diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriterITCase.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriterITCase.java
index 313f85e..f507551 100644
--- a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriterITCase.java
+++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriterITCase.java
@@ -76,7 +76,7 @@ class OpensearchAsyncWriterITCase {
 
     private final Lock lock = new ReentrantLock();
     private final Condition completed = lock.newCondition();
-    private final List> requests = new ArrayList<>();
+    private final List requests = new ArrayList<>();
 
     @BeforeEach
     void setUp() {
@@ -270,8 +270,8 @@ private OpensearchAsyncWriter> createWriter(
                 Collections.emptyList()) {
             @Override
             protected void submitRequestEntries(
-                    List> requestEntries,
-                    Consumer>> requestResult) {
+                    List requestEntries,
+                    Consumer> requestResult) {
                 super.submitRequestEntries(
                         requestEntries,
                         (entries) -> {
@@ -290,7 +290,7 @@ protected void submitRequestEntries(
     }
 
     private static class UpdatingElementConverter
-            implements ElementConverter, DocSerdeRequest> {
+            implements ElementConverter, DocSerdeRequest> {
         private static final long serialVersionUID = 1L;
 
         private final String dataFieldName;
@@ -302,7 +302,7 @@ private static class UpdatingElementConverter
         }
 
         @Override
-        public DocSerdeRequest apply(Tuple2 element, Context context) {
+        public DocSerdeRequest apply(Tuple2 element, Context context) {
             Map document = new HashMap<>();
             document.put(dataFieldName, element.f1);
 
diff --git a/pom.xml b/pom.xml
index 015f2d7..079703b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,7 +58,7 @@ under the License.
 	
 
 	
-		1.16.1
+		1.16.0
 		15.0
 		
 		2.13.4.20221013

From a0299851c58f71af547e60c40d5976ab26c1dc9c Mon Sep 17 00:00:00 2001
From: Andriy Redko 
Date: Fri, 10 Feb 2023 15:06:34 -0500
Subject: [PATCH 6/8] Addressing code review comments

Signed-off-by: Andriy Redko 
---
 .../sink/OpensearchAsyncSinkBuilder.java      | 31 ++++---
 .../opensearch/sink/DocSerdeRequestTest.java  | 92 +++++++++++++++++++
 2 files changed, 109 insertions(+), 14 deletions(-)
 create mode 100644 flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/DocSerdeRequestTest.java

diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilder.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilder.java
index f6bb89e..8727de3 100644
--- a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilder.java
+++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilder.java
@@ -52,6 +52,17 @@
 @PublicEvolving
 public class OpensearchAsyncSinkBuilder
         extends AsyncSinkBaseBuilder> {
+    private static final int DEFAULT_MAX_RECORD_SIZE_BYTES = 1 * 1024 * 1024; /* 1Mb */
+    // Source: OpensearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION
+    private static final int DEFAULT_MAX_TIME_IN_BUFFER_MS = 1000;
+    // Source: OpensearchConnectorOptions.BULK_FLUSH_MAX_SIZE_OPTION
+    private static final int DEFAULT_MAX_BATCH_SIZE_BYTES = 2 * 1024 * 1024; /* 2Mb */
+    private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 10000;
+    // Source: BulkProcessor::concurrentRequests
+    private static final int DEFAULT_MAX_INFLIGHT_REQUESTS = 1;
+    // Source: OpensearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION
+    private static final int DEFAULT_BULK_FLUSH_MAX_ACTIONS = 1000;
+
     private List hosts;
     private String username;
     private String password;
@@ -205,20 +216,12 @@ public OpensearchAsyncSinkBuilder setAllowInsecure(boolean allowInsecure
     @Override
     public OpensearchAsyncSink build() {
         return new OpensearchAsyncSink(
-                nonNullOrDefault(
-                        getMaxBatchSize(),
-                        1000), /* OpensearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION */
-                nonNullOrDefault(
-                        getMaxInFlightRequests(), 1), /* BulkProcessor::concurrentRequests */
-                nonNullOrDefault(getMaxBufferedRequests(), 10000),
-                nonNullOrDefault(
-                        getMaxBatchSizeInBytes(),
-                        2 * 1024
-                                * 1024), /* OpensearchConnectorOptions.BULK_FLUSH_MAX_SIZE_OPTION */
-                nonNullOrDefault(
-                        getMaxTimeInBufferMS(),
-                        1000), /* OpensearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION */
-                nonNullOrDefault(getMaxRecordSizeInBytes(), 1 * 1024 * 1024), /* 1Mb */
+                nonNullOrDefault(getMaxBatchSize(), DEFAULT_BULK_FLUSH_MAX_ACTIONS),
+                nonNullOrDefault(getMaxInFlightRequests(), DEFAULT_MAX_INFLIGHT_REQUESTS),
+                nonNullOrDefault(getMaxBufferedRequests(), DEFAULT_MAX_BUFFERED_REQUESTS),
+                nonNullOrDefault(getMaxBatchSizeInBytes(), DEFAULT_MAX_BATCH_SIZE_BYTES),
+                nonNullOrDefault(getMaxTimeInBufferMS(), DEFAULT_MAX_TIME_IN_BUFFER_MS),
+                nonNullOrDefault(getMaxRecordSizeInBytes(), DEFAULT_MAX_RECORD_SIZE_BYTES),
                 elementConverter,
                 hosts,
                 buildNetworkClientConfig());
diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/DocSerdeRequestTest.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/DocSerdeRequestTest.java
new file mode 100644
index 0000000..c605647
--- /dev/null
+++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/DocSerdeRequestTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.assertj.core.api.recursive.comparison.RecursiveComparisonConfiguration;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.opensearch.action.DocWriteRequest;
+import org.opensearch.action.delete.DeleteRequest;
+import org.opensearch.action.index.IndexRequest;
+import org.opensearch.action.update.UpdateRequest;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.mock;
+
+class DocSerdeRequestTest {
+    @ParameterizedTest
+    @MethodSource("requests")
+    void serde(DocWriteRequest request) throws IOException {
+        final DocSerdeRequest serialized = DocSerdeRequest.from(request);
+
+        try (final ByteArrayOutputStream bytes = new ByteArrayOutputStream()) {
+            try (final DataOutputStream out = new DataOutputStream(bytes)) {
+                serialized.writeTo(out);
+            }
+
+            try (final DataInputStream in =
+                    new DataInputStream(new ByteArrayInputStream(bytes.toByteArray()))) {
+                final DocSerdeRequest deserialized = DocSerdeRequest.readFrom(Byte.MAX_VALUE, in);
+                assertThat(deserialized.getRequest())
+                        .usingRecursiveComparison(
+                                RecursiveComparisonConfiguration.builder()
+                                        /* ignoring 'type', it is deprecated but backfilled for 1.x compatibility */
+                                        .withIgnoredFields("type", "doc.type")
+                                        .build())
+                        .isEqualTo(serialized.getRequest());
+            }
+        }
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void unsupportedRequestType() throws IOException {
+        final DocSerdeRequest serialized = DocSerdeRequest.from(mock(DocWriteRequest.class));
+        try (final ByteArrayOutputStream bytes = new ByteArrayOutputStream()) {
+            try (final DataOutputStream out = new DataOutputStream(bytes)) {
+                assertThatThrownBy(() -> serialized.writeTo(out))
+                        .isInstanceOf(IllegalStateException.class);
+            }
+        }
+    }
+
+    private static Stream requests() {
+        return Stream.of(
+                Arguments.of(new DeleteRequest("index").id("id")),
+                Arguments.of(
+                        new UpdateRequest()
+                                .index("index")
+                                .id("id")
+                                .doc(Collections.singletonMap("action", "update"))),
+                Arguments.of(
+                        new IndexRequest("index")
+                                .id("id")
+                                .source(Collections.singletonMap("action", "index"))));
+    }
+}

From 0d1c95737ae21b5bbd165f61984d77ea1c1e0022 Mon Sep 17 00:00:00 2001
From: Andriy Redko 
Date: Mon, 13 Feb 2023 08:35:13 -0500
Subject: [PATCH 7/8] Addressing code review comments

Signed-off-by: Andriy Redko 
---
 .github/workflows/push_pr.yml                 |   2 +-
 .../sink/OpensearchAsyncWriter.java           |  78 +-----------
 .../sink/OpensearchRestClientCreator.java     | 115 ++++++++++++++++++
 .../opensearch/sink/OpensearchWriter.java     |  77 +-----------
 pom.xml                                       |   2 +-
 5 files changed, 119 insertions(+), 155 deletions(-)
 create mode 100644 flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchRestClientCreator.java

diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index e1ba2d4..e3a7f8b 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -25,4 +25,4 @@ jobs:
   compile_and_test:
     uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
     with:
-      flink_version: 1.16.0
+      flink_version: 1.16.1
diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java
index 7f17c68..accf2a5 100644
--- a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java
+++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java
@@ -29,20 +29,12 @@
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 
 import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.conn.ssl.TrustAllStrategy;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.ssl.SSLContexts;
 import org.opensearch.OpenSearchException;
 import org.opensearch.action.ActionListener;
 import org.opensearch.action.bulk.BulkItemResponse;
 import org.opensearch.action.bulk.BulkRequest;
 import org.opensearch.action.bulk.BulkResponse;
 import org.opensearch.client.RequestOptions;
-import org.opensearch.client.RestClient;
-import org.opensearch.client.RestClientBuilder;
 import org.opensearch.client.RestHighLevelClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,9 +42,6 @@
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.NoRouteToHostException;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -135,12 +124,7 @@ class OpensearchAsyncWriter extends AsyncSinkWriter {
-                    if (networkClientConfig.getPassword() != null
-                            && networkClientConfig.getUsername() != null) {
-                        final CredentialsProvider credentialsProvider =
-                                new BasicCredentialsProvider();
-                        credentialsProvider.setCredentials(
-                                AuthScope.ANY,
-                                new UsernamePasswordCredentials(
-                                        networkClientConfig.getUsername(),
-                                        networkClientConfig.getPassword()));
-
-                        httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
-                    }
-
-                    if (networkClientConfig.isAllowInsecure().orElse(false)) {
-                        try {
-                            httpClientBuilder.setSSLContext(
-                                    SSLContexts.custom()
-                                            .loadTrustMaterial(new TrustAllStrategy())
-                                            .build());
-                        } catch (final NoSuchAlgorithmException
-                                | KeyStoreException
-                                | KeyManagementException ex) {
-                            throw new IllegalStateException(
-                                    "Unable to create custom SSL context", ex);
-                        }
-                    }
-
-                    return httpClientBuilder;
-                });
-        if (networkClientConfig.getConnectionRequestTimeout() != null
-                || networkClientConfig.getConnectionTimeout() != null
-                || networkClientConfig.getSocketTimeout() != null) {
-            builder.setRequestConfigCallback(
-                    requestConfigBuilder -> {
-                        if (networkClientConfig.getConnectionRequestTimeout() != null) {
-                            requestConfigBuilder.setConnectionRequestTimeout(
-                                    networkClientConfig.getConnectionRequestTimeout());
-                        }
-                        if (networkClientConfig.getConnectionTimeout() != null) {
-                            requestConfigBuilder.setConnectTimeout(
-                                    networkClientConfig.getConnectionTimeout());
-                        }
-                        if (networkClientConfig.getSocketTimeout() != null) {
-                            requestConfigBuilder.setSocketTimeout(
-                                    networkClientConfig.getSocketTimeout());
-                        }
-                        return requestConfigBuilder;
-                    });
-        }
-        return builder;
-    }
 }
diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchRestClientCreator.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchRestClientCreator.java
new file mode 100644
index 0000000..22784db
--- /dev/null
+++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchRestClientCreator.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.conn.ssl.TrustAllStrategy;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.ssl.SSLContexts;
+import org.opensearch.client.RestClient;
+import org.opensearch.client.RestClientBuilder;
+import org.opensearch.client.RestHighLevelClient;
+
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.util.List;
+
+/** The utility class to encapsulate {@link RestHighLevelClient} creation. */
+class OpensearchRestClientCreator {
+    /** Utility class. */
+    private OpensearchRestClientCreator() {}
+
+    /**
+     * Creates new instance of {@link RestHighLevelClient}.
+     *
+     * @param hosts list of hosts to connect
+     * @param networkClientConfig client network configuration
+     * @return new instance of {@link RestHighLevelClient}
+     */
+    static RestHighLevelClient create(
+            final List hosts, final NetworkClientConfig networkClientConfig) {
+        return new RestHighLevelClient(
+                configureRestClientBuilder(
+                        RestClient.builder(hosts.toArray(new HttpHost[0])), networkClientConfig));
+    }
+
+    private static RestClientBuilder configureRestClientBuilder(
+            RestClientBuilder builder, NetworkClientConfig networkClientConfig) {
+        if (networkClientConfig.getConnectionPathPrefix() != null) {
+            builder.setPathPrefix(networkClientConfig.getConnectionPathPrefix());
+        }
+
+        builder.setHttpClientConfigCallback(
+                httpClientBuilder -> {
+                    if (networkClientConfig.getPassword() != null
+                            && networkClientConfig.getUsername() != null) {
+                        final CredentialsProvider credentialsProvider =
+                                new BasicCredentialsProvider();
+                        credentialsProvider.setCredentials(
+                                AuthScope.ANY,
+                                new UsernamePasswordCredentials(
+                                        networkClientConfig.getUsername(),
+                                        networkClientConfig.getPassword()));
+
+                        httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+                    }
+
+                    if (networkClientConfig.isAllowInsecure().orElse(false)) {
+                        try {
+                            httpClientBuilder.setSSLContext(
+                                    SSLContexts.custom()
+                                            .loadTrustMaterial(new TrustAllStrategy())
+                                            .build());
+                        } catch (final NoSuchAlgorithmException
+                                | KeyStoreException
+                                | KeyManagementException ex) {
+                            throw new IllegalStateException(
+                                    "Unable to create custom SSL context", ex);
+                        }
+                    }
+
+                    return httpClientBuilder;
+                });
+        if (networkClientConfig.getConnectionRequestTimeout() != null
+                || networkClientConfig.getConnectionTimeout() != null
+                || networkClientConfig.getSocketTimeout() != null) {
+            builder.setRequestConfigCallback(
+                    requestConfigBuilder -> {
+                        if (networkClientConfig.getConnectionRequestTimeout() != null) {
+                            requestConfigBuilder.setConnectionRequestTimeout(
+                                    networkClientConfig.getConnectionRequestTimeout());
+                        }
+                        if (networkClientConfig.getConnectionTimeout() != null) {
+                            requestConfigBuilder.setConnectTimeout(
+                                    networkClientConfig.getConnectionTimeout());
+                        }
+                        if (networkClientConfig.getSocketTimeout() != null) {
+                            requestConfigBuilder.setSocketTimeout(
+                                    networkClientConfig.getSocketTimeout());
+                        }
+                        return requestConfigBuilder;
+                    });
+        }
+        return builder;
+    }
+}
diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java
index 3231b28..1b0d216 100644
--- a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java
+++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java
@@ -27,12 +27,6 @@
 import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.conn.ssl.TrustAllStrategy;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.ssl.SSLContexts;
 import org.opensearch.action.ActionListener;
 import org.opensearch.action.DocWriteRequest;
 import org.opensearch.action.bulk.BackoffPolicy;
@@ -44,8 +38,6 @@
 import org.opensearch.action.index.IndexRequest;
 import org.opensearch.action.update.UpdateRequest;
 import org.opensearch.client.RequestOptions;
-import org.opensearch.client.RestClient;
-import org.opensearch.client.RestClientBuilder;
 import org.opensearch.client.RestHighLevelClient;
 import org.opensearch.common.unit.ByteSizeUnit;
 import org.opensearch.common.unit.ByteSizeValue;
@@ -55,9 +47,6 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
 import java.util.List;
 
 import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
@@ -107,11 +96,7 @@ class OpensearchWriter implements SinkWriter {
         this.emitter = checkNotNull(emitter);
         this.flushOnCheckpoint = flushOnCheckpoint;
         this.mailboxExecutor = checkNotNull(mailboxExecutor);
-        this.client =
-                new RestHighLevelClient(
-                        configureRestClientBuilder(
-                                RestClient.builder(hosts.toArray(new HttpHost[0])),
-                                networkClientConfig));
+        this.client = OpensearchRestClientCreator.create(hosts, networkClientConfig);
         this.bulkProcessor = createBulkProcessor(bulkProcessorConfig);
         this.requestIndexer = new DefaultRequestIndexer(metricGroup.getNumRecordsSendCounter());
         checkNotNull(metricGroup);
@@ -161,66 +146,6 @@ public void close() throws Exception {
         client.close();
     }
 
-    private static RestClientBuilder configureRestClientBuilder(
-            RestClientBuilder builder, NetworkClientConfig networkClientConfig) {
-        if (networkClientConfig.getConnectionPathPrefix() != null) {
-            builder.setPathPrefix(networkClientConfig.getConnectionPathPrefix());
-        }
-
-        builder.setHttpClientConfigCallback(
-                httpClientBuilder -> {
-                    if (networkClientConfig.getPassword() != null
-                            && networkClientConfig.getUsername() != null) {
-                        final CredentialsProvider credentialsProvider =
-                                new BasicCredentialsProvider();
-                        credentialsProvider.setCredentials(
-                                AuthScope.ANY,
-                                new UsernamePasswordCredentials(
-                                        networkClientConfig.getUsername(),
-                                        networkClientConfig.getPassword()));
-
-                        httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
-                    }
-
-                    if (networkClientConfig.isAllowInsecure().orElse(false)) {
-                        try {
-                            httpClientBuilder.setSSLContext(
-                                    SSLContexts.custom()
-                                            .loadTrustMaterial(new TrustAllStrategy())
-                                            .build());
-                        } catch (final NoSuchAlgorithmException
-                                | KeyStoreException
-                                | KeyManagementException ex) {
-                            throw new IllegalStateException(
-                                    "Unable to create custom SSL context", ex);
-                        }
-                    }
-
-                    return httpClientBuilder;
-                });
-        if (networkClientConfig.getConnectionRequestTimeout() != null
-                || networkClientConfig.getConnectionTimeout() != null
-                || networkClientConfig.getSocketTimeout() != null) {
-            builder.setRequestConfigCallback(
-                    requestConfigBuilder -> {
-                        if (networkClientConfig.getConnectionRequestTimeout() != null) {
-                            requestConfigBuilder.setConnectionRequestTimeout(
-                                    networkClientConfig.getConnectionRequestTimeout());
-                        }
-                        if (networkClientConfig.getConnectionTimeout() != null) {
-                            requestConfigBuilder.setConnectTimeout(
-                                    networkClientConfig.getConnectionTimeout());
-                        }
-                        if (networkClientConfig.getSocketTimeout() != null) {
-                            requestConfigBuilder.setSocketTimeout(
-                                    networkClientConfig.getSocketTimeout());
-                        }
-                        return requestConfigBuilder;
-                    });
-        }
-        return builder;
-    }
-
     private BulkProcessor createBulkProcessor(BulkProcessorConfig bulkProcessorConfig) {
 
         final BulkProcessor.Builder builder =
diff --git a/pom.xml b/pom.xml
index 079703b..015f2d7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,7 +58,7 @@ under the License.
 	
 
 	
-		1.16.0
+		1.16.1
 		15.0
 		
 		2.13.4.20221013

From 4cd6423d5eeb476993155fff1d8f46ad7cb99254 Mon Sep 17 00:00:00 2001
From: Andriy Redko 
Date: Fri, 3 Mar 2023 10:59:27 -0500
Subject: [PATCH 8/8] Address code review comments

Signed-off-by: Andriy Redko 
---
 .../docs/connectors/datastream/opensearch.md  |  79 ++++++++++++
 .../opensearch/sink/DocSerdeRequestTest.java  | 113 +++++++++++++++++-
 2 files changed, 189 insertions(+), 3 deletions(-)

diff --git a/docs/content/docs/connectors/datastream/opensearch.md b/docs/content/docs/connectors/datastream/opensearch.md
index 8150b8b..ed32945 100644
--- a/docs/content/docs/connectors/datastream/opensearch.md
+++ b/docs/content/docs/connectors/datastream/opensearch.md
@@ -162,6 +162,85 @@ This will buffer elements before sending them in bulk to the cluster. The `BulkP
 executes bulk requests one at a time, i.e. there will be no two concurrent
 flushes of the buffered actions in progress.
 
+## Opensearch AsyncSink
+
+The example below shows how to configure and create a AsyncSink (see please [FLIP-171](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink)):
+
+{{< tabs "b1732edd-4218-470e-adad-b1ebb4021a1b" >}}
+{{< tab "Java" >}}
+
+```java
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.connector.opensearch.sink.OpensearchAsyncSink;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import org.apache.http.HttpHost;
+import org.opensearch.action.index.IndexRequest;
+import org.opensearch.client.Requests;
+
+import java.util.HashMap;
+import java.util.Map;
+
+DataStream input = ...;
+
+input.sinkTo(
+    OpensearchAsyncSink.builder()
+        .setHosts(new HttpHost("localhost", 9200, "http"))
+        .setElementConverter((element: String, context: SinkWriter.Context) -> createIndexRequest(element))
+        .build());
+
+
+private static IndexRequest createIndexRequest(String element) {
+    Map json = new HashMap<>();
+    json.put("data", element);
+
+    return Requests.indexRequest()
+        .index("my-index")
+        .id(element)
+        .source(json);
+}
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+import org.apache.flink.api.connector.sink.SinkWriter
+import org.apache.flink.connector.opensearch.sink.{OpensearchAsyncSink, RequestIndexer}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.http.HttpHost
+import org.opensearch.action.index.IndexRequest
+import org.opensearch.client.Requests
+
+val input: DataStream[String] = ...
+
+input.sinkTo(
+  OpensearchAsyncSink[String]
+    .builder()
+    .setMaxBatchSize(1) // Instructs the AsyncSink to emit after every element, otherwise they would be buffered
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setElementConverter((element: String, context: SinkWriter.Context) => createIndexRequest(element))
+    .build())
+
+def createIndexRequest(element: (String)): IndexRequest = {
+
+  val json = Map(
+    "data" -> element.asInstanceOf[AnyRef]
+  )
+
+  Requests.indexRequest.index("my-index").source(mapAsJavaMap(json))
+}
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+Note that the example only demonstrates performing a single index
+request for each incoming element. Generally, the `ElementConverter`
+can be used to produce the requests of different types (ex.,
+`DeleteRequest`, `UpdateRequest`, etc.). 
+
+Internally, each parallel instance of the Flink Opensearch AsyncSink uses
+a `RestHighLevelClient::bulkAsync` to send action requests to the cluster.
+
 ### Opensearch Sinks and Fault Tolerance
 
 With Flink’s checkpointing enabled, the Flink Opensearch Sink guarantees
diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/DocSerdeRequestTest.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/DocSerdeRequestTest.java
index c605647..ad2dda9 100644
--- a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/DocSerdeRequestTest.java
+++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/DocSerdeRequestTest.java
@@ -25,7 +25,9 @@
 import org.opensearch.action.DocWriteRequest;
 import org.opensearch.action.delete.DeleteRequest;
 import org.opensearch.action.index.IndexRequest;
+import org.opensearch.action.support.IndicesOptions;
 import org.opensearch.action.update.UpdateRequest;
+import org.opensearch.index.VersionType;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -37,7 +39,6 @@
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.Mockito.mock;
 
 class DocSerdeRequestTest {
     @ParameterizedTest
@@ -65,9 +66,8 @@ void serde(DocWriteRequest request) throws IOException {
     }
 
     @Test
-    @SuppressWarnings("unchecked")
     void unsupportedRequestType() throws IOException {
-        final DocSerdeRequest serialized = DocSerdeRequest.from(mock(DocWriteRequest.class));
+        final DocSerdeRequest serialized = DocSerdeRequest.from(new DummyDocWriteRequest());
         try (final ByteArrayOutputStream bytes = new ByteArrayOutputStream()) {
             try (final DataOutputStream out = new DataOutputStream(bytes)) {
                 assertThatThrownBy(() -> serialized.writeTo(out))
@@ -89,4 +89,111 @@ private static Stream requests() {
                                 .id("id")
                                 .source(Collections.singletonMap("action", "index"))));
     }
+
+    private static class DummyDocWriteRequest implements DocWriteRequest {
+        @Override
+        public String[] indices() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public long ramBytesUsed() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Object index(String index) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public String index() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Object type(String type) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public String type() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Object defaultTypeIfNull(String defaultType) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public String id() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public IndicesOptions indicesOptions() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Object routing(String routing) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public String routing() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public long version() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Object version(long version) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public VersionType versionType() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Object versionType(VersionType versionType) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Object setIfSeqNo(long seqNo) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Object setIfPrimaryTerm(long term) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public long ifSeqNo() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public long ifPrimaryTerm() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public OpType opType() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isRequireAlias() {
+            throw new UnsupportedOperationException();
+        }
+    }
 }