Skip to content

Commit 6407aab

Browse files
authored
[FLINK-25756][docs] Add documentation
1 parent 48b0c0f commit 6407aab

File tree

5 files changed

+1139
-1
lines changed

5 files changed

+1139
-1
lines changed
Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
---
2+
title: Opensearch
3+
weight: 5
4+
type: docs
5+
---
6+
7+
<!--
8+
Licensed to the Apache Software Foundation (ASF) under one
9+
or more contributor license agreements. See the NOTICE file
10+
distributed with this work for additional information
11+
regarding copyright ownership. The ASF licenses this file
12+
to you under the Apache License, Version 2.0 (the
13+
"License"); you may not use this file except in compliance
14+
with the License. You may obtain a copy of the License at
15+
16+
http://www.apache.org/licenses/LICENSE-2.0
17+
18+
Unless required by applicable law or agreed to in writing,
19+
software distributed under the License is distributed on an
20+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
21+
KIND, either express or implied. See the License for the
22+
specific language governing permissions and limitations
23+
under the License.
24+
-->
25+
26+
# Opensearch Connector
27+
28+
This connector provides sinks that can request document actions to an
29+
[Opensearch](https://opensearch.org/) Index. To use this connector, add
30+
the following dependency to your project:
31+
32+
<table class="table table-bordered">
33+
<thead>
34+
<tr>
35+
<th class="text-left">Opensearch version</th>
36+
<th class="text-left">Maven Dependency</th>
37+
</tr>
38+
</thead>
39+
<tbody>
40+
<tr>
41+
<td>1.x</td>
42+
<td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
43+
</tr>
44+
<tr>
45+
<td>2.x</td>
46+
<td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
47+
</tr>
48+
</tbody>
49+
</table>
50+
51+
Note that the streaming connectors are currently not part of the binary
52+
distribution. See [here]({{< ref "docs/dev/configuration/overview" >}}) for information
53+
about how to package the program with the libraries for cluster execution.
54+
55+
## Installing Opensearch
56+
57+
Instructions for setting up an Opensearch cluster can be found
58+
[here](https://opensearch.org/docs/latest/opensearch/install/index/).
59+
60+
## Opensearch Sink
61+
62+
The example below shows how to configure and create a sink:
63+
64+
{{< tabs "a1732edd-4218-470e-adad-b1ebb4021a12" >}}
65+
{{< tab "Java" >}}
66+
67+
```java
68+
import org.apache.flink.api.common.functions.MapFunction;
69+
import org.apache.flink.connector.opensearch.sink.OpensearchSinkBuilder;
70+
import org.apache.flink.streaming.api.datastream.DataStream;
71+
72+
import org.apache.http.HttpHost;
73+
import org.opensearch.action.index.IndexRequest;
74+
import org.opensearch.client.Requests;
75+
76+
import java.util.HashMap;
77+
import java.util.Map;
78+
79+
DataStream<String> input = ...;
80+
81+
input.sinkTo(
82+
new OpensearchSinkBuilder<String>()
83+
.setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered
84+
.setHosts(new HttpHost("127.0.0.1", 9200, "http"))
85+
.setEmitter(
86+
(element, context, indexer) ->
87+
indexer.add(createIndexRequest(element)))
88+
.build());
89+
90+
private static IndexRequest createIndexRequest(String element) {
91+
Map<String, Object> json = new HashMap<>();
92+
json.put("data", element);
93+
94+
return Requests.indexRequest()
95+
.index("my-index")
96+
.id(element)
97+
.source(json);
98+
}
99+
```
100+
{{< /tab >}}
101+
{{< tab "Scala" >}}
102+
```scala
103+
import org.apache.flink.api.connector.sink.SinkWriter
104+
import org.apache.flink.connector.opensearch.sink.{OpensearchSinkBuilder, RequestIndexer}
105+
import org.apache.flink.streaming.api.datastream.DataStream
106+
import org.apache.http.HttpHost
107+
import org.opensearch.action.index.IndexRequest
108+
import org.opensearch.client.Requests
109+
110+
val input: DataStream[String] = ...
111+
112+
input.sinkTo(
113+
new OpensearchSinkBuilder[String]
114+
.setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered
115+
.setHosts(new HttpHost("127.0.0.1", 9200, "http"))
116+
.setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) =>
117+
indexer.add(createIndexRequest(element)))
118+
.build())
119+
120+
def createIndexRequest(element: (String)): IndexRequest = {
121+
122+
val json = Map(
123+
"data" -> element.asInstanceOf[AnyRef]
124+
)
125+
126+
Requests.indexRequest.index("my-index").source(mapAsJavaMap(json))
127+
}
128+
```
129+
130+
{{< /tab >}}
131+
{{< /tabs >}}
132+
133+
Note that the example only demonstrates performing a single index
134+
request for each incoming element. Generally, the `OpensearchEmitter`
135+
can be used to perform requests of different types (ex.,
136+
`DeleteRequest`, `UpdateRequest`, etc.).
137+
138+
Internally, each parallel instance of the Flink Opensearch Sink uses
139+
a `BulkProcessor` to send action requests to the cluster.
140+
This will buffer elements before sending them in bulk to the cluster. The `BulkProcessor`
141+
executes bulk requests one at a time, i.e. there will be no two concurrent
142+
flushes of the buffered actions in progress.
143+
144+
### Opensearch Sinks and Fault Tolerance
145+
146+
With Flink’s checkpointing enabled, the Flink Opensearch Sink guarantees
147+
at-least-once delivery of action requests to Opensearch clusters. It does
148+
so by waiting for all pending action requests in the `BulkProcessor` at the
149+
time of checkpoints. This effectively assures that all requests before the
150+
checkpoint was triggered have been successfully acknowledged by Opensearch, before
151+
proceeding to process more records sent to the sink.
152+
153+
More details on checkpoints and fault tolerance are in the [fault tolerance docs]({{< ref "docs/learn-flink/fault_tolerance" >}}).
154+
155+
To use fault tolerant Opensearch Sinks, checkpointing of the topology needs to be enabled at the execution environment:
156+
157+
{{< tabs "aa0d1e93-4844-40d7-b0ec-9ec37e731a5f" >}}
158+
{{< tab "Java" >}}
159+
```java
160+
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
161+
env.enableCheckpointing(5000); // checkpoint every 5000 msecs
162+
```
163+
{{< /tab >}}
164+
{{< tab "Scala" >}}
165+
```scala
166+
val env = StreamExecutionEnvironment.getExecutionEnvironment()
167+
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
168+
```
169+
170+
{{< /tab >}}
171+
{{< /tabs >}}
172+
173+
<p style="border-radius: 5px; padding: 5px" class="bg-info">
174+
<b>IMPORTANT</b>: Checkpointing is not enabled by default but the default delivery guarantee is `AT_LEAST_ONCE`.
175+
This causes the sink to buffer requests until it either finishes or the `BulkProcessor` flushes automatically.
176+
By default, the `BulkProcessor` will flush after `1000` added actions. To configure the processor to flush more frequently, please refer to the <a href="#configuring-the-internal-bulk-processor">BulkProcessor configuration section</a>.
177+
</p>
178+
179+
<p style="border-radius: 5px; padding: 5px" class="bg-info">
180+
Using `UpdateRequests` with deterministic IDs and the upsert method it is possible to achieve exactly-once semantics in Opensearch when `AT_LEAST_ONCE` delivery is configured for the connector.
181+
</p>
182+
183+
### Handling Failing Opensearch Requests
184+
185+
Opensearch action requests may fail due to a variety of reasons, including
186+
temporarily saturated node queue capacity or malformed documents to be indexed.
187+
The Flink Opensearch Sink allows the user to retry requests by specifying a backoff-policy.
188+
189+
Below is an example:
190+
191+
{{< tabs "adb958b3-5dd5-476e-b946-ace3335628ea" >}}
192+
{{< tab "Java" >}}
193+
```java
194+
DataStream<String> input = ...;
195+
196+
input.sinkTo(
197+
new OpensearchSinkBuilder<String>()
198+
.setHosts(new HttpHost("127.0.0.1", 9200, "http"))
199+
.setEmitter(
200+
(element, context, indexer) ->
201+
indexer.add(createIndexRequest(element)))
202+
// This enables an exponential backoff retry mechanism, with a maximum of 5 retries and an initial delay of 1000 milliseconds
203+
.setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
204+
.build());
205+
```
206+
{{< /tab >}}
207+
{{< tab "Scala" >}}
208+
```scala
209+
val input: DataStream[String] = ...
210+
211+
input.sinkTo(
212+
new OpensearchSinkBuilder[String]
213+
.setHosts(new HttpHost("127.0.0.1", 9200, "http"))
214+
.setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) =>
215+
indexer.add(createIndexRequest(element)))
216+
// This enables an exponential backoff retry mechanism, with a maximum of 5 retries and an initial delay of 1000 milliseconds
217+
.setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
218+
.build())
219+
```
220+
221+
{{< /tab >}}
222+
{{< /tabs >}}
223+
224+
The above example will let the sink re-add requests that failed due to resource constrains (e.g.
225+
queue capacity saturation). For all other failures, such as malformed documents, the sink will fail.
226+
If no `BulkFlushBackoffStrategy` (or `FlushBackoffType.NONE`) is configured, the sink will fail for any kind of error.
227+
228+
<p style="border-radius: 5px; padding: 5px" class="bg-danger">
229+
<b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b>
230+
on failures will lead to longer checkpoints, as the sink will also
231+
need to wait for the re-added requests to be flushed when checkpointing.
232+
For example, when using <b>FlushBackoffType.EXPONENTIAL</b>, checkpoints
233+
will need to wait until Opensearch node queues have enough capacity for
234+
all the pending requests, or until the maximum number of retries has been reached.
235+
</p>
236+
237+
### Configuring the Internal Bulk Processor
238+
239+
The internal `BulkProcessor` can be further configured for its behaviour
240+
on how buffered action requests are flushed, by using the following methods of the OpensearchSinkBuilder:
241+
242+
* **setBulkFlushMaxActions(int numMaxActions)**: Maximum amount of actions to buffer before flushing.
243+
* **setBulkFlushMaxSizeMb(int maxSizeMb)**: Maximum size of data (in megabytes) to buffer before flushing.
244+
* **setBulkFlushInterval(long intervalMillis)**: Interval at which to flush regardless of the amount or size of buffered actions.
245+
246+
Configuring how temporary request errors are retried is also supported:
247+
* **setBulkFlushBackoffStrategy(FlushBackoffType flushBackoffType, int maxRetries, long delayMillis)**: The type of backoff delay, either `CONSTANT` or `EXPONENTIAL`, the amount of backoff retries to attempt, the amount of delay for backoff. For constant backoff, this
248+
is simply the delay between each retry. For exponential backoff, this is the initial base delay.
249+
250+
More information about Opensearch can be found [here](https://opensearch.org/).
251+
252+
## Packaging the Opensearch Connector into an Uber-Jar
253+
254+
For the execution of your Flink program, it is recommended to build a
255+
so-called uber-jar (executable jar) containing all your dependencies
256+
(see [here]({{< ref "docs/dev/configuration" >}}) for further information).
257+
258+
Alternatively, you can put the connector's jar file into Flink's `lib/` folder to make it available
259+
system-wide, i.e. for all job being run.
260+
261+
{{< top >}}

0 commit comments

Comments
 (0)