Skip to content

Commit 5aea5c8

Browse files
committed
Addressing code review comments
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
1 parent 46b018e commit 5aea5c8

File tree

4 files changed

+43
-22
lines changed

4 files changed

+43
-22
lines changed

flink-connector-opensearch-e2e-tests/src/main/java/org/apache/flink/streaming/tests/OpensearchAsyncSinkExample.java

+1-7
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,6 @@
2929
import org.apache.http.HttpHost;
3030
import org.opensearch.action.index.IndexRequest;
3131

32-
import java.util.ArrayList;
33-
import java.util.List;
34-
3532
/** End to end test for OpensearchAsyncSink. */
3633
public class OpensearchAsyncSinkExample {
3734

@@ -62,12 +59,9 @@ public void flatMap(
6259
}
6360
});
6461

65-
List<HttpHost> httpHosts = new ArrayList<>();
66-
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
67-
6862
OpensearchAsyncSinkBuilder<Tuple2<String, String>> osSinkBuilder =
6963
OpensearchAsyncSink.<Tuple2<String, String>>builder()
70-
.setHosts(new HttpHost("localhost:9200"))
64+
.setHosts(new HttpHost("localhost", 9200, "http"))
7165
.setElementConverter(
7266
(element, context) ->
7367
new IndexRequest("my-index")

flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSink.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,13 @@ public class OpensearchAsyncSink<InputT> extends AsyncSinkBase<InputT, DocSerdeR
5454
* Constructor creating an Opensearch async sink.
5555
*
5656
* @param maxBatchSize the maximum size of a batch of entries that may be sent
57-
* @param maxInFlightRequests he maximum number of in flight requests that may exist, if any
57+
* @param maxInFlightRequests the maximum number of in flight requests that may exist, if any
5858
* more in flight requests need to be initiated once the maximum has been reached, then it
5959
* will be blocked until some have completed
6060
* @param maxBufferedRequests the maximum number of elements held in the buffer, requests to add
6161
* elements will be blocked while the number of elements in the buffer is at the maximum
62-
* @param maxBatchSizeInBytes the maximum size of a batch of entries that may be sent to KDS
63-
* measured in bytes
62+
* @param maxBatchSizeInBytes the maximum size of a batch of entries that may be sent to
63+
* OpenSearch measured in bytes
6464
* @param maxTimeInBufferMS the maximum amount of time an entry is allowed to live in the
6565
* buffer, if any element reaches this age, the entire buffer will be flushed immediately
6666
* @param maxRecordSizeInBytes the maximum size of a record the sink will accept into the

flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilder.java

+36-9
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import java.util.Arrays;
2929
import java.util.List;
30+
import java.util.concurrent.TimeUnit;
3031

3132
import static org.apache.flink.util.Preconditions.checkArgument;
3233
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -84,7 +85,7 @@ public OpensearchAsyncSinkBuilder<InputT> setElementConverter(
8485
*/
8586
public OpensearchAsyncSinkBuilder<InputT> setHosts(HttpHost... hosts) {
8687
checkNotNull(hosts);
87-
checkState(hosts.length > 0, "Hosts cannot be empty.");
88+
checkArgument(hosts.length > 0, "Hosts cannot be empty.");
8889
this.hosts = Arrays.asList(hosts);
8990
return this;
9091
}
@@ -97,6 +98,7 @@ public OpensearchAsyncSinkBuilder<InputT> setHosts(HttpHost... hosts) {
9798
*/
9899
public OpensearchAsyncSinkBuilder<InputT> setConnectionUsername(String username) {
99100
checkNotNull(username);
101+
checkArgument(!username.trim().isEmpty(), "Username cannot be empty");
100102
this.username = username;
101103
return this;
102104
}
@@ -109,6 +111,7 @@ public OpensearchAsyncSinkBuilder<InputT> setConnectionUsername(String username)
109111
*/
110112
public OpensearchAsyncSinkBuilder<InputT> setConnectionPassword(String password) {
111113
checkNotNull(password);
114+
checkArgument(!password.trim().isEmpty(), "Password cannot be empty");
112115
this.password = password;
113116
return this;
114117
}
@@ -129,40 +132,66 @@ public OpensearchAsyncSinkBuilder<InputT> setConnectionPathPrefix(String prefix)
129132
* Sets the timeout for requesting the connection of the Opensearch cluster from the connection
130133
* manager.
131134
*
132-
* @param timeout for the connection request
135+
* @param timeout timeout for the connection request (in milliseconds)
133136
* @return this builder
134137
*/
135138
public OpensearchAsyncSinkBuilder<InputT> setConnectionRequestTimeout(int timeout) {
136-
checkState(timeout >= 0, "Connection request timeout must be larger than or equal to 0.");
139+
checkArgument(
140+
timeout >= 0, "Connection request timeout must be larger than or equal to 0.");
137141
this.connectionRequestTimeout = timeout;
138142
return this;
139143
}
140144

141145
/**
142146
* Sets the timeout for establishing a connection of the Opensearch cluster.
143147
*
144-
* @param timeout for the connection
148+
* @param timeout timeout for the connection (in milliseconds)
145149
* @return this builder
146150
*/
147151
public OpensearchAsyncSinkBuilder<InputT> setConnectionTimeout(int timeout) {
148-
checkState(timeout >= 0, "Connection timeout must be larger than or equal to 0.");
152+
checkArgument(timeout >= 0, "Connection timeout must be larger than or equal to 0.");
149153
this.connectionTimeout = timeout;
150154
return this;
151155
}
152156

157+
/**
158+
* Sets the timeout for establishing a connection of the Opensearch cluster.
159+
*
160+
* @param timeout timeout for the connection (in milliseconds)
161+
* @param timeUnit timeout time unit
162+
* @return this builder
163+
*/
164+
public OpensearchAsyncSinkBuilder<InputT> setConnectionTimeout(int timeout, TimeUnit timeUnit) {
165+
checkNotNull(timeUnit, "TimeUnit cannot be null.");
166+
return setConnectionTimeout((int) timeUnit.toMillis(timeout));
167+
}
168+
153169
/**
154170
* Sets the timeout for waiting for data or, put differently, a maximum period inactivity
155171
* between two consecutive data packets.
156172
*
157-
* @param timeout for the socket
173+
* @param timeout timeout for the socket (in milliseconds)
158174
* @return this builder
159175
*/
160176
public OpensearchAsyncSinkBuilder<InputT> setSocketTimeout(int timeout) {
161-
checkState(timeout >= 0, "Socket timeout must be larger than or equal to 0.");
177+
checkArgument(timeout >= 0, "Socket timeout must be larger than or equal to 0.");
162178
this.socketTimeout = timeout;
163179
return this;
164180
}
165181

182+
/**
183+
* Sets the timeout for waiting for data or, put differently, a maximum period inactivity
184+
* between two consecutive data packets.
185+
*
186+
* @param timeout timeout for the socket
187+
* @param timeUnit timeout time unit
188+
* @return this builder
189+
*/
190+
public OpensearchAsyncSinkBuilder<InputT> setSocketTimeout(int timeout, TimeUnit timeUnit) {
191+
checkNotNull(timeUnit, "TimeUnit cannot be null.");
192+
return setSocketTimeout((int) timeUnit.toMillis(timeout));
193+
}
194+
166195
/**
167196
* Allows to bypass the certificates chain validation and connect to insecure network endpoints
168197
* (for example, servers which use self-signed certificates).
@@ -177,8 +206,6 @@ public OpensearchAsyncSinkBuilder<InputT> setAllowInsecure(boolean allowInsecure
177206

178207
@Override
179208
public OpensearchAsyncSink<InputT> build() {
180-
checkArgument(!hosts.isEmpty(), "Hosts cannot be empty.");
181-
182209
return new OpensearchAsyncSink<InputT>(
183210
nonNullOrDefault(
184211
getMaxBatchSize(),

flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilderTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,11 @@ void testThrowIfElementConverterNotSet() {
7171
@Test
7272
void testThrowIfSetInvalidTimeouts() {
7373
assertThatThrownBy(() -> createEmptyBuilder().setConnectionRequestTimeout(-1).build())
74-
.isInstanceOf(IllegalStateException.class);
74+
.isInstanceOf(IllegalArgumentException.class);
7575
assertThatThrownBy(() -> createEmptyBuilder().setConnectionTimeout(-1).build())
76-
.isInstanceOf(IllegalStateException.class);
76+
.isInstanceOf(IllegalArgumentException.class);
7777
assertThatThrownBy(() -> createEmptyBuilder().setSocketTimeout(-1).build())
78-
.isInstanceOf(IllegalStateException.class);
78+
.isInstanceOf(IllegalArgumentException.class);
7979
}
8080

8181
private OpensearchAsyncSinkBuilder<Object> createEmptyBuilder() {

0 commit comments

Comments
 (0)