Skip to content

Commit 2aacffb

Browse files
committed
[FLINK-30488] OpenSearch implementation of Async Sink
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
1 parent d0c5059 commit 2aacffb

File tree

13 files changed

+1543
-1
lines changed

13 files changed

+1543
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.streaming.tests;
19+
20+
import org.apache.flink.api.common.functions.FlatMapFunction;
21+
import org.apache.flink.api.java.tuple.Tuple2;
22+
import org.apache.flink.api.java.utils.ParameterTool;
23+
import org.apache.flink.connector.opensearch.sink.OpensearchAsyncSink;
24+
import org.apache.flink.connector.opensearch.sink.OpensearchAsyncSinkBuilder;
25+
import org.apache.flink.streaming.api.datastream.DataStream;
26+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
27+
import org.apache.flink.util.Collector;
28+
29+
import org.apache.http.HttpHost;
30+
import org.opensearch.action.index.IndexRequest;
31+
32+
import java.util.ArrayList;
33+
import java.util.List;
34+
35+
/** End to end test for OpensearchAsyncSink. */
36+
public class OpensearchAsyncSinkExample {
37+
38+
public static void main(String[] args) throws Exception {
39+
40+
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
41+
42+
if (parameterTool.getNumberOfParameters() < 2) {
43+
System.out.println(
44+
"Missing parameters!\n" + "Usage: --numRecords <numRecords> --index <index>");
45+
return;
46+
}
47+
48+
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
49+
env.enableCheckpointing(5000);
50+
51+
DataStream<Tuple2<String, String>> source =
52+
env.fromSequence(0, parameterTool.getInt("numRecords") - 1)
53+
.flatMap(
54+
new FlatMapFunction<Long, Tuple2<String, String>>() {
55+
@Override
56+
public void flatMap(
57+
Long value, Collector<Tuple2<String, String>> out) {
58+
final String key = String.valueOf(value);
59+
final String message = "message #" + value;
60+
out.collect(Tuple2.of(key, message + "update #1"));
61+
out.collect(Tuple2.of(key, message + "update #2"));
62+
}
63+
});
64+
65+
List<HttpHost> httpHosts = new ArrayList<>();
66+
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
67+
68+
OpensearchAsyncSinkBuilder<Tuple2<String, String>> osSinkBuilder =
69+
OpensearchAsyncSink.<Tuple2<String, String>>builder()
70+
.setHosts(new HttpHost("localhost:9200"))
71+
.setElementConverter(
72+
(element, context) ->
73+
new IndexRequest("my-index")
74+
.id(element.f0.toString())
75+
.source(element.f1));
76+
77+
source.sinkTo(osSinkBuilder.build());
78+
79+
env.execute("Opensearch end to end async sink test example");
80+
}
81+
}

flink-connector-opensearch/archunit-violations/4382f1f0-807a-45ff-97d8-42f72b6e9484

+12
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,15 @@
1+
org.apache.flink.connector.opensearch.sink.OpensearchAsyncSinkITCase does not satisfy: only one of the following predicates match:\
2+
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
3+
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
4+
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
5+
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
6+
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
7+
org.apache.flink.connector.opensearch.sink.OpensearchAsyncWriterITCase does not satisfy: only one of the following predicates match:\
8+
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
9+
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
10+
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
11+
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
12+
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
113
org.apache.flink.connector.opensearch.sink.OpensearchSinkITCase does not satisfy: only one of the following predicates match:\
214
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
315
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\

flink-connector-opensearch/pom.xml

+8
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,14 @@ under the License.
118118
<type>test-jar</type>
119119
</dependency>
120120

121+
<dependency>
122+
<groupId>org.apache.flink</groupId>
123+
<artifactId>flink-connector-base</artifactId>
124+
<version>${flink.version}</version>
125+
<scope>test</scope>
126+
<type>test-jar</type>
127+
</dependency>
128+
121129
<!-- Opensearch table descriptor testing -->
122130
<dependency>
123131
<groupId>org.apache.flink</groupId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.opensearch.sink;
20+
21+
import org.apache.flink.annotation.PublicEvolving;
22+
23+
import org.opensearch.action.DocWriteRequest;
24+
import org.opensearch.action.delete.DeleteRequest;
25+
import org.opensearch.action.index.IndexRequest;
26+
import org.opensearch.action.update.UpdateRequest;
27+
import org.opensearch.common.bytes.BytesReference;
28+
import org.opensearch.common.io.stream.BytesStreamOutput;
29+
import org.opensearch.common.io.stream.InputStreamStreamInput;
30+
import org.opensearch.common.io.stream.StreamInput;
31+
import org.opensearch.common.io.stream.StreamOutput;
32+
33+
import java.io.DataInputStream;
34+
import java.io.DataOutputStream;
35+
import java.io.IOException;
36+
import java.io.Serializable;
37+
38+
/**
39+
* Wrapper class around {@link DocWriteRequest} since it does not implement {@link Serializable},
40+
* required by AsyncSink scaffolding.
41+
*
42+
* @param <T> type of the write request
43+
*/
44+
@PublicEvolving
45+
public class DocSerdeRequest<T> implements Serializable {
46+
private static final long serialVersionUID = 1L;
47+
private final DocWriteRequest<T> request;
48+
49+
private DocSerdeRequest(DocWriteRequest<T> request) {
50+
this.request = request;
51+
}
52+
53+
public DocWriteRequest<T> getRequest() {
54+
return request;
55+
}
56+
57+
static <T> DocSerdeRequest<T> from(DocWriteRequest<T> request) {
58+
return new DocSerdeRequest<>(request);
59+
}
60+
61+
static DocSerdeRequest<?> readFrom(long requestSize, DataInputStream in) throws IOException {
62+
try (final StreamInput stream = new InputStreamStreamInput(in, requestSize)) {
63+
return new DocSerdeRequest<>(readDocumentRequest(stream));
64+
}
65+
}
66+
67+
void writeTo(DataOutputStream out) throws IOException {
68+
try (BytesStreamOutput stream = new BytesStreamOutput()) {
69+
writeDocumentRequest(stream, request);
70+
out.write(BytesReference.toBytes(stream.bytes()));
71+
}
72+
}
73+
74+
/** Read a document write (index/delete/update) request. */
75+
private static DocWriteRequest<?> readDocumentRequest(StreamInput in) throws IOException {
76+
byte type = in.readByte();
77+
DocWriteRequest<?> docWriteRequest;
78+
if (type == 0) {
79+
docWriteRequest = new IndexRequest(in);
80+
} else if (type == 1) {
81+
docWriteRequest = new DeleteRequest(in);
82+
} else if (type == 2) {
83+
docWriteRequest = new UpdateRequest(in);
84+
} else {
85+
throw new IllegalStateException("Invalid request type [" + type + " ]");
86+
}
87+
return docWriteRequest;
88+
}
89+
90+
/** Write a document write (index/delete/update) request. */
91+
private static void writeDocumentRequest(StreamOutput out, DocWriteRequest<?> request)
92+
throws IOException {
93+
if (request instanceof IndexRequest) {
94+
out.writeByte((byte) 0);
95+
((IndexRequest) request).writeTo(out);
96+
} else if (request instanceof DeleteRequest) {
97+
out.writeByte((byte) 1);
98+
((DeleteRequest) request).writeTo(out);
99+
} else if (request instanceof UpdateRequest) {
100+
out.writeByte((byte) 2);
101+
((UpdateRequest) request).writeTo(out);
102+
} else {
103+
throw new IllegalStateException(
104+
"Invalid request [" + request.getClass().getSimpleName() + " ]");
105+
}
106+
}
107+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.opensearch.sink;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.annotation.PublicEvolving;
23+
import org.apache.flink.connector.base.sink.AsyncSinkBase;
24+
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
25+
import org.apache.flink.connector.base.sink.writer.ElementConverter;
26+
import org.apache.flink.core.io.SimpleVersionedSerializer;
27+
28+
import org.apache.http.HttpHost;
29+
30+
import java.io.IOException;
31+
import java.util.Collection;
32+
import java.util.Collections;
33+
import java.util.List;
34+
35+
import static org.apache.flink.util.Preconditions.checkArgument;
36+
import static org.apache.flink.util.Preconditions.checkNotNull;
37+
38+
/**
39+
* Apache Flink's Async Sink to insert or update data in an Opensearch index (see please {@link
40+
* OpensearchAsyncWriter}).
41+
*
42+
* @param <InputT> type of the records converted to Opensearch actions (instances of {@link
43+
* DocSerdeRequest})
44+
* @see OpensearchAsyncSinkBuilder on how to construct a OpensearchAsyncSink
45+
*/
46+
@PublicEvolving
47+
public class OpensearchAsyncSink<InputT> extends AsyncSinkBase<InputT, DocSerdeRequest<?>> {
48+
private static final long serialVersionUID = 1L;
49+
50+
private final List<HttpHost> hosts;
51+
private final NetworkClientConfig networkClientConfig;
52+
53+
/**
54+
* Constructor creating an Opensearch async sink.
55+
*
56+
* @param maxBatchSize the maximum size of a batch of entries that may be sent
57+
* @param maxInFlightRequests he maximum number of in flight requests that may exist, if any
58+
* more in flight requests need to be initiated once the maximum has been reached, then it
59+
* will be blocked until some have completed
60+
* @param maxBufferedRequests the maximum number of elements held in the buffer, requests to add
61+
* elements will be blocked while the number of elements in the buffer is at the maximum
62+
* @param maxBatchSizeInBytes the maximum size of a batch of entries that may be sent to KDS
63+
* measured in bytes
64+
* @param maxTimeInBufferMS the maximum amount of time an entry is allowed to live in the
65+
* buffer, if any element reaches this age, the entire buffer will be flushed immediately
66+
* @param maxRecordSizeInBytes the maximum size of a record the sink will accept into the
67+
* buffer, a record of size larger than this will be rejected when passed to the sink
68+
* @param elementConverter converting incoming records to Opensearch write document requests
69+
* @param hosts the reachable Opensearch cluster nodes
70+
* @param networkClientConfig describing properties of the network connection used to connect to
71+
* the Opensearch cluster
72+
*/
73+
OpensearchAsyncSink(
74+
int maxBatchSize,
75+
int maxInFlightRequests,
76+
int maxBufferedRequests,
77+
long maxBatchSizeInBytes,
78+
long maxTimeInBufferMS,
79+
long maxRecordSizeInBytes,
80+
ElementConverter<InputT, DocSerdeRequest<?>> elementConverter,
81+
List<HttpHost> hosts,
82+
NetworkClientConfig networkClientConfig) {
83+
super(
84+
elementConverter,
85+
maxBatchSize,
86+
maxInFlightRequests,
87+
maxBufferedRequests,
88+
maxBatchSizeInBytes,
89+
maxTimeInBufferMS,
90+
maxRecordSizeInBytes);
91+
this.hosts = checkNotNull(hosts);
92+
checkArgument(!hosts.isEmpty(), "Hosts cannot be empty.");
93+
this.networkClientConfig = checkNotNull(networkClientConfig);
94+
}
95+
96+
/**
97+
* Create a {@link OpensearchAsyncSinkBuilder} to construct a new {@link OpensearchAsyncSink}.
98+
*
99+
* @param <InputT> type of incoming records
100+
* @return {@link OpensearchAsyncSinkBuilder}
101+
*/
102+
public static <InputT> OpensearchAsyncSinkBuilder<InputT> builder() {
103+
return new OpensearchAsyncSinkBuilder<>();
104+
}
105+
106+
@Internal
107+
@Override
108+
public StatefulSinkWriter<InputT, BufferedRequestState<DocSerdeRequest<?>>> createWriter(
109+
InitContext context) throws IOException {
110+
return new OpensearchAsyncWriter<>(
111+
context,
112+
getElementConverter(),
113+
getMaxBatchSize(),
114+
getMaxInFlightRequests(),
115+
getMaxBufferedRequests(),
116+
getMaxBatchSizeInBytes(),
117+
getMaxTimeInBufferMS(),
118+
getMaxRecordSizeInBytes(),
119+
hosts,
120+
networkClientConfig,
121+
Collections.emptyList());
122+
}
123+
124+
@Internal
125+
@Override
126+
public StatefulSinkWriter<InputT, BufferedRequestState<DocSerdeRequest<?>>> restoreWriter(
127+
InitContext context,
128+
Collection<BufferedRequestState<DocSerdeRequest<?>>> recoveredState)
129+
throws IOException {
130+
return new OpensearchAsyncWriter<>(
131+
context,
132+
getElementConverter(),
133+
getMaxBatchSize(),
134+
getMaxInFlightRequests(),
135+
getMaxBufferedRequests(),
136+
getMaxBatchSizeInBytes(),
137+
getMaxTimeInBufferMS(),
138+
getMaxRecordSizeInBytes(),
139+
hosts,
140+
networkClientConfig,
141+
recoveredState);
142+
}
143+
144+
@Internal
145+
@Override
146+
public SimpleVersionedSerializer<BufferedRequestState<DocSerdeRequest<?>>>
147+
getWriterStateSerializer() {
148+
return new OpensearchWriterStateSerializer();
149+
}
150+
}

0 commit comments

Comments
 (0)