Skip to content

Commit bf1e2de

Browse files
committed
Addressing code review comments
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
1 parent dce9c3b commit bf1e2de

File tree

1 file changed

+35
-1
lines changed

1 file changed

+35
-1
lines changed

flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java

+35-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.api.connector.sink2.Sink;
23+
import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
2324
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
2425
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
2526
import org.apache.flink.connector.base.sink.writer.ElementConverter;
@@ -34,6 +35,7 @@
3435
import org.apache.http.conn.ssl.TrustAllStrategy;
3536
import org.apache.http.impl.client.BasicCredentialsProvider;
3637
import org.apache.http.ssl.SSLContexts;
38+
import org.opensearch.OpenSearchException;
3739
import org.opensearch.action.ActionListener;
3840
import org.opensearch.action.bulk.BulkItemResponse;
3941
import org.opensearch.action.bulk.BulkRequest;
@@ -46,6 +48,8 @@
4648
import org.slf4j.LoggerFactory;
4749

4850
import java.io.IOException;
51+
import java.net.ConnectException;
52+
import java.net.NoRouteToHostException;
4953
import java.security.KeyManagementException;
5054
import java.security.KeyStoreException;
5155
import java.security.NoSuchAlgorithmException;
@@ -73,6 +77,17 @@ class OpensearchAsyncWriter<InputT> extends AsyncSinkWriter<InputT, DocSerdeRequ
7377
private final Counter numRecordsOutErrorsCounter;
7478
private volatile boolean closed = false;
7579

80+
private static final FatalExceptionClassifier OPENSEARCH_FATAL_EXCEPTION_CLASSIFIER =
81+
FatalExceptionClassifier.createChain(
82+
new FatalExceptionClassifier(
83+
err ->
84+
err instanceof NoRouteToHostException
85+
|| err instanceof ConnectException,
86+
err ->
87+
new OpenSearchException(
88+
"Could not connect to Opensearch cluster using provided hosts",
89+
err)));
90+
7691
/**
7792
* Constructor creating an Opensearch async writer.
7893
*
@@ -186,12 +201,31 @@ public void close() {
186201
}
187202
}
188203

204+
private boolean isRetryable(Throwable err) {
205+
// isFatal() is really isNotFatal()
206+
if (!OPENSEARCH_FATAL_EXCEPTION_CLASSIFIER.isFatal(err, getFatalExceptionCons())) {
207+
return false;
208+
}
209+
return true;
210+
}
211+
189212
private void handleFullyFailedBulkRequest(
190213
Throwable err,
191214
List<DocSerdeRequest<?>> requestEntries,
192215
Consumer<List<DocSerdeRequest<?>>> requestResult) {
216+
final boolean retryable = isRetryable(err.getCause());
217+
218+
LOG.warn(
219+
"Opensearch AsyncWwriter failed to persist {} entries (retryable = {})",
220+
requestEntries.size(),
221+
retryable,
222+
err);
223+
193224
numRecordsOutErrorsCounter.inc(requestEntries.size());
194-
requestResult.accept(requestEntries);
225+
226+
if (retryable) {
227+
requestResult.accept(requestEntries);
228+
}
195229
}
196230

197231
private void handlePartiallyFailedBulkRequests(

0 commit comments

Comments
 (0)