Skip to content

Commit 455048c

Browse files
committed
Addressing code review comments
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
1 parent 317dcac commit 455048c

File tree

5 files changed

+119
-155
lines changed

5 files changed

+119
-155
lines changed

.github/workflows/push_pr.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,4 @@ jobs:
2525
compile_and_test:
2626
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
2727
with:
28-
flink_version: 1.16.0
28+
flink_version: 1.16.1

flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java

+1-77
Original file line numberDiff line numberDiff line change
@@ -29,30 +29,19 @@
2929
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
3030

3131
import org.apache.http.HttpHost;
32-
import org.apache.http.auth.AuthScope;
33-
import org.apache.http.auth.UsernamePasswordCredentials;
34-
import org.apache.http.client.CredentialsProvider;
35-
import org.apache.http.conn.ssl.TrustAllStrategy;
36-
import org.apache.http.impl.client.BasicCredentialsProvider;
37-
import org.apache.http.ssl.SSLContexts;
3832
import org.opensearch.OpenSearchException;
3933
import org.opensearch.action.ActionListener;
4034
import org.opensearch.action.bulk.BulkItemResponse;
4135
import org.opensearch.action.bulk.BulkRequest;
4236
import org.opensearch.action.bulk.BulkResponse;
4337
import org.opensearch.client.RequestOptions;
44-
import org.opensearch.client.RestClient;
45-
import org.opensearch.client.RestClientBuilder;
4638
import org.opensearch.client.RestHighLevelClient;
4739
import org.slf4j.Logger;
4840
import org.slf4j.LoggerFactory;
4941

5042
import java.io.IOException;
5143
import java.net.ConnectException;
5244
import java.net.NoRouteToHostException;
53-
import java.security.KeyManagementException;
54-
import java.security.KeyStoreException;
55-
import java.security.NoSuchAlgorithmException;
5645
import java.util.ArrayList;
5746
import java.util.Collection;
5847
import java.util.Collections;
@@ -135,12 +124,7 @@ class OpensearchAsyncWriter<InputT> extends AsyncSinkWriter<InputT, DocSerdeRequ
135124
.build(),
136125
initialStates);
137126

138-
this.client =
139-
new RestHighLevelClient(
140-
configureRestClientBuilder(
141-
RestClient.builder(hosts.toArray(new HttpHost[0])),
142-
networkClientConfig));
143-
127+
this.client = OpensearchRestClientCreator.create(hosts, networkClientConfig);
144128
final SinkWriterMetricGroup metricGroup = context.metricGroup();
145129
checkNotNull(metricGroup);
146130

@@ -244,64 +228,4 @@ private void handlePartiallyFailedBulkRequests(
244228
numRecordsOutErrorsCounter.inc(failedRequestEntries.size());
245229
requestResult.accept(failedRequestEntries);
246230
}
247-
248-
private static RestClientBuilder configureRestClientBuilder(
249-
RestClientBuilder builder, NetworkClientConfig networkClientConfig) {
250-
if (networkClientConfig.getConnectionPathPrefix() != null) {
251-
builder.setPathPrefix(networkClientConfig.getConnectionPathPrefix());
252-
}
253-
254-
builder.setHttpClientConfigCallback(
255-
httpClientBuilder -> {
256-
if (networkClientConfig.getPassword() != null
257-
&& networkClientConfig.getUsername() != null) {
258-
final CredentialsProvider credentialsProvider =
259-
new BasicCredentialsProvider();
260-
credentialsProvider.setCredentials(
261-
AuthScope.ANY,
262-
new UsernamePasswordCredentials(
263-
networkClientConfig.getUsername(),
264-
networkClientConfig.getPassword()));
265-
266-
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
267-
}
268-
269-
if (networkClientConfig.isAllowInsecure().orElse(false)) {
270-
try {
271-
httpClientBuilder.setSSLContext(
272-
SSLContexts.custom()
273-
.loadTrustMaterial(new TrustAllStrategy())
274-
.build());
275-
} catch (final NoSuchAlgorithmException
276-
| KeyStoreException
277-
| KeyManagementException ex) {
278-
throw new IllegalStateException(
279-
"Unable to create custom SSL context", ex);
280-
}
281-
}
282-
283-
return httpClientBuilder;
284-
});
285-
if (networkClientConfig.getConnectionRequestTimeout() != null
286-
|| networkClientConfig.getConnectionTimeout() != null
287-
|| networkClientConfig.getSocketTimeout() != null) {
288-
builder.setRequestConfigCallback(
289-
requestConfigBuilder -> {
290-
if (networkClientConfig.getConnectionRequestTimeout() != null) {
291-
requestConfigBuilder.setConnectionRequestTimeout(
292-
networkClientConfig.getConnectionRequestTimeout());
293-
}
294-
if (networkClientConfig.getConnectionTimeout() != null) {
295-
requestConfigBuilder.setConnectTimeout(
296-
networkClientConfig.getConnectionTimeout());
297-
}
298-
if (networkClientConfig.getSocketTimeout() != null) {
299-
requestConfigBuilder.setSocketTimeout(
300-
networkClientConfig.getSocketTimeout());
301-
}
302-
return requestConfigBuilder;
303-
});
304-
}
305-
return builder;
306-
}
307231
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.opensearch.sink;
20+
21+
import org.apache.http.HttpHost;
22+
import org.apache.http.auth.AuthScope;
23+
import org.apache.http.auth.UsernamePasswordCredentials;
24+
import org.apache.http.client.CredentialsProvider;
25+
import org.apache.http.conn.ssl.TrustAllStrategy;
26+
import org.apache.http.impl.client.BasicCredentialsProvider;
27+
import org.apache.http.ssl.SSLContexts;
28+
import org.opensearch.client.RestClient;
29+
import org.opensearch.client.RestClientBuilder;
30+
import org.opensearch.client.RestHighLevelClient;
31+
32+
import java.security.KeyManagementException;
33+
import java.security.KeyStoreException;
34+
import java.security.NoSuchAlgorithmException;
35+
import java.util.List;
36+
37+
/** The utility class to encapsulate {@link RestHighLevelClient} creation. */
38+
class OpensearchRestClientCreator {
39+
/** Utility class. */
40+
private OpensearchRestClientCreator() {}
41+
42+
/**
43+
* Creates new instance of {@link RestHighLevelClient}.
44+
*
45+
* @param hosts list of hosts to connect
46+
* @param networkClientConfig client network configuration
47+
* @return new instance of {@link RestHighLevelClient}
48+
*/
49+
static RestHighLevelClient create(
50+
final List<HttpHost> hosts, final NetworkClientConfig networkClientConfig) {
51+
return new RestHighLevelClient(
52+
configureRestClientBuilder(
53+
RestClient.builder(hosts.toArray(new HttpHost[0])), networkClientConfig));
54+
}
55+
56+
private static RestClientBuilder configureRestClientBuilder(
57+
RestClientBuilder builder, NetworkClientConfig networkClientConfig) {
58+
if (networkClientConfig.getConnectionPathPrefix() != null) {
59+
builder.setPathPrefix(networkClientConfig.getConnectionPathPrefix());
60+
}
61+
62+
builder.setHttpClientConfigCallback(
63+
httpClientBuilder -> {
64+
if (networkClientConfig.getPassword() != null
65+
&& networkClientConfig.getUsername() != null) {
66+
final CredentialsProvider credentialsProvider =
67+
new BasicCredentialsProvider();
68+
credentialsProvider.setCredentials(
69+
AuthScope.ANY,
70+
new UsernamePasswordCredentials(
71+
networkClientConfig.getUsername(),
72+
networkClientConfig.getPassword()));
73+
74+
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
75+
}
76+
77+
if (networkClientConfig.isAllowInsecure().orElse(false)) {
78+
try {
79+
httpClientBuilder.setSSLContext(
80+
SSLContexts.custom()
81+
.loadTrustMaterial(new TrustAllStrategy())
82+
.build());
83+
} catch (final NoSuchAlgorithmException
84+
| KeyStoreException
85+
| KeyManagementException ex) {
86+
throw new IllegalStateException(
87+
"Unable to create custom SSL context", ex);
88+
}
89+
}
90+
91+
return httpClientBuilder;
92+
});
93+
if (networkClientConfig.getConnectionRequestTimeout() != null
94+
|| networkClientConfig.getConnectionTimeout() != null
95+
|| networkClientConfig.getSocketTimeout() != null) {
96+
builder.setRequestConfigCallback(
97+
requestConfigBuilder -> {
98+
if (networkClientConfig.getConnectionRequestTimeout() != null) {
99+
requestConfigBuilder.setConnectionRequestTimeout(
100+
networkClientConfig.getConnectionRequestTimeout());
101+
}
102+
if (networkClientConfig.getConnectionTimeout() != null) {
103+
requestConfigBuilder.setConnectTimeout(
104+
networkClientConfig.getConnectionTimeout());
105+
}
106+
if (networkClientConfig.getSocketTimeout() != null) {
107+
requestConfigBuilder.setSocketTimeout(
108+
networkClientConfig.getSocketTimeout());
109+
}
110+
return requestConfigBuilder;
111+
});
112+
}
113+
return builder;
114+
}
115+
}

flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java

+1-76
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,6 @@
2727
import org.apache.flink.util.function.ThrowingRunnable;
2828

2929
import org.apache.http.HttpHost;
30-
import org.apache.http.auth.AuthScope;
31-
import org.apache.http.auth.UsernamePasswordCredentials;
32-
import org.apache.http.client.CredentialsProvider;
33-
import org.apache.http.conn.ssl.TrustAllStrategy;
34-
import org.apache.http.impl.client.BasicCredentialsProvider;
35-
import org.apache.http.ssl.SSLContexts;
3630
import org.opensearch.action.ActionListener;
3731
import org.opensearch.action.DocWriteRequest;
3832
import org.opensearch.action.bulk.BackoffPolicy;
@@ -44,8 +38,6 @@
4438
import org.opensearch.action.index.IndexRequest;
4539
import org.opensearch.action.update.UpdateRequest;
4640
import org.opensearch.client.RequestOptions;
47-
import org.opensearch.client.RestClient;
48-
import org.opensearch.client.RestClientBuilder;
4941
import org.opensearch.client.RestHighLevelClient;
5042
import org.opensearch.common.unit.ByteSizeUnit;
5143
import org.opensearch.common.unit.ByteSizeValue;
@@ -55,9 +47,6 @@
5547
import org.slf4j.LoggerFactory;
5648

5749
import java.io.IOException;
58-
import java.security.KeyManagementException;
59-
import java.security.KeyStoreException;
60-
import java.security.NoSuchAlgorithmException;
6150
import java.util.List;
6251

6352
import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
@@ -107,11 +96,7 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
10796
this.emitter = checkNotNull(emitter);
10897
this.flushOnCheckpoint = flushOnCheckpoint;
10998
this.mailboxExecutor = checkNotNull(mailboxExecutor);
110-
this.client =
111-
new RestHighLevelClient(
112-
configureRestClientBuilder(
113-
RestClient.builder(hosts.toArray(new HttpHost[0])),
114-
networkClientConfig));
99+
this.client = OpensearchRestClientCreator.create(hosts, networkClientConfig);
115100
this.bulkProcessor = createBulkProcessor(bulkProcessorConfig);
116101
this.requestIndexer = new DefaultRequestIndexer(metricGroup.getNumRecordsSendCounter());
117102
checkNotNull(metricGroup);
@@ -161,66 +146,6 @@ public void close() throws Exception {
161146
client.close();
162147
}
163148

164-
private static RestClientBuilder configureRestClientBuilder(
165-
RestClientBuilder builder, NetworkClientConfig networkClientConfig) {
166-
if (networkClientConfig.getConnectionPathPrefix() != null) {
167-
builder.setPathPrefix(networkClientConfig.getConnectionPathPrefix());
168-
}
169-
170-
builder.setHttpClientConfigCallback(
171-
httpClientBuilder -> {
172-
if (networkClientConfig.getPassword() != null
173-
&& networkClientConfig.getUsername() != null) {
174-
final CredentialsProvider credentialsProvider =
175-
new BasicCredentialsProvider();
176-
credentialsProvider.setCredentials(
177-
AuthScope.ANY,
178-
new UsernamePasswordCredentials(
179-
networkClientConfig.getUsername(),
180-
networkClientConfig.getPassword()));
181-
182-
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
183-
}
184-
185-
if (networkClientConfig.isAllowInsecure().orElse(false)) {
186-
try {
187-
httpClientBuilder.setSSLContext(
188-
SSLContexts.custom()
189-
.loadTrustMaterial(new TrustAllStrategy())
190-
.build());
191-
} catch (final NoSuchAlgorithmException
192-
| KeyStoreException
193-
| KeyManagementException ex) {
194-
throw new IllegalStateException(
195-
"Unable to create custom SSL context", ex);
196-
}
197-
}
198-
199-
return httpClientBuilder;
200-
});
201-
if (networkClientConfig.getConnectionRequestTimeout() != null
202-
|| networkClientConfig.getConnectionTimeout() != null
203-
|| networkClientConfig.getSocketTimeout() != null) {
204-
builder.setRequestConfigCallback(
205-
requestConfigBuilder -> {
206-
if (networkClientConfig.getConnectionRequestTimeout() != null) {
207-
requestConfigBuilder.setConnectionRequestTimeout(
208-
networkClientConfig.getConnectionRequestTimeout());
209-
}
210-
if (networkClientConfig.getConnectionTimeout() != null) {
211-
requestConfigBuilder.setConnectTimeout(
212-
networkClientConfig.getConnectionTimeout());
213-
}
214-
if (networkClientConfig.getSocketTimeout() != null) {
215-
requestConfigBuilder.setSocketTimeout(
216-
networkClientConfig.getSocketTimeout());
217-
}
218-
return requestConfigBuilder;
219-
});
220-
}
221-
return builder;
222-
}
223-
224149
private BulkProcessor createBulkProcessor(BulkProcessorConfig bulkProcessorConfig) {
225150

226151
final BulkProcessor.Builder builder =

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ under the License.
5858
</modules>
5959

6060
<properties>
61-
<flink.version>1.16.0</flink.version>
61+
<flink.version>1.16.1</flink.version>
6262
<flink.shaded.version>15.0</flink.shaded.version>
6363

6464
<jackson-bom.version>2.13.4.20221013</jackson-bom.version>

0 commit comments

Comments
 (0)