18
18
package org .apache .flink .streaming .tests ;
19
19
20
20
import org .apache .flink .api .common .functions .FlatMapFunction ;
21
- import org .apache .flink .api .common .functions .RuntimeContext ;
22
21
import org .apache .flink .api .java .tuple .Tuple2 ;
23
- import org .apache .flink .api .java .utils .ParameterTool ;
22
+ import org .apache .flink .connector .opensearch .sink .FailureHandler ;
23
+ import org .apache .flink .connector .opensearch .sink .OpensearchEmitter ;
24
+ import org .apache .flink .connector .opensearch .sink .OpensearchSinkBuilder ;
24
25
import org .apache .flink .streaming .api .datastream .DataStream ;
25
26
import org .apache .flink .streaming .api .environment .StreamExecutionEnvironment ;
26
- import org .apache .flink .streaming .connectors .opensearch .ActionRequestFailureHandler ;
27
- import org .apache .flink .streaming .connectors .opensearch .OpensearchSink ;
28
- import org .apache .flink .streaming .connectors .opensearch .RequestIndexer ;
29
27
import org .apache .flink .util .Collector ;
30
-
28
+ import org . apache . flink . util . ParameterTool ;
31
29
import org .apache .http .HttpHost ;
32
- import org .opensearch .action .ActionRequest ;
33
30
import org .opensearch .action .index .IndexRequest ;
34
31
import org .opensearch .action .update .UpdateRequest ;
35
32
import org .opensearch .client .Requests ;
36
33
37
- import java .util .ArrayList ;
38
34
import java .util .HashMap ;
39
- import java .util .List ;
40
35
import java .util .Map ;
41
36
42
37
/** End to end test for OpensearchSink. */
@@ -69,31 +64,21 @@ public void flatMap(
69
64
}
70
65
});
71
66
72
- List <HttpHost > httpHosts = new ArrayList <>();
73
- httpHosts .add (new HttpHost ("127.0.0.1" , 9200 , "http" ));
74
-
75
- OpensearchSink .Builder <Tuple2 <String , String >> osSinkBuilder =
76
- new OpensearchSink .Builder <>(
77
- httpHosts ,
78
- (Tuple2 <String , String > element ,
79
- RuntimeContext ctx ,
80
- RequestIndexer indexer ) -> {
67
+ OpensearchSinkBuilder <Tuple2 <String , String >> osSinkBuilder =
68
+ new OpensearchSinkBuilder <>()
69
+ .setHosts (new HttpHost ("127.0.0.1" , 9200 , "http" ))
70
+ .setEmitter ((OpensearchEmitter <Tuple2 <String , String >>) (element , writer , indexer ) -> {
81
71
indexer .add (createIndexRequest (element .f1 , parameterTool ));
82
72
indexer .add (createUpdateRequest (element , parameterTool ));
83
- });
84
-
85
- osSinkBuilder .setFailureHandler (
86
- new CustomFailureHandler (parameterTool .getRequired ("index" )));
87
-
88
- // this instructs the sink to emit after every element, otherwise they would be buffered
89
- osSinkBuilder .setBulkFlushMaxActions (1 );
90
-
91
- source .addSink (osSinkBuilder .build ());
73
+ })
74
+ .setFailureHandler (new CustomFailureHandler (parameterTool .getRequired ("index" )))
75
+ .setBulkFlushMaxActions (1 );
92
76
77
+ source .sinkTo (osSinkBuilder .build ());
93
78
env .execute ("Opensearch end to end sink test example" );
94
79
}
95
80
96
- private static class CustomFailureHandler implements ActionRequestFailureHandler {
81
+ private static class CustomFailureHandler implements FailureHandler {
97
82
98
83
private static final long serialVersionUID = 942269087742453482L ;
99
84
@@ -104,21 +89,8 @@ private static class CustomFailureHandler implements ActionRequestFailureHandler
104
89
}
105
90
106
91
@ Override
107
- public void onFailure (
108
- ActionRequest action , Throwable failure , int restStatusCode , RequestIndexer indexer )
109
- throws Throwable {
110
- if (action instanceof IndexRequest ) {
111
- Map <String , Object > json = new HashMap <>();
112
- json .put ("data" , ((IndexRequest ) action ).source ());
113
-
114
- indexer .add (
115
- Requests .indexRequest ()
116
- .index (index )
117
- .id (((IndexRequest ) action ).id ())
118
- .source (json ));
119
- } else {
120
- throw new IllegalStateException ("unexpected" );
121
- }
92
+ public void onFailure (Throwable failure ) {
93
+ throw new IllegalStateException ("Unexpected exception for index:" + index );
122
94
}
123
95
}
124
96
0 commit comments