diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..c008740
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,169 @@
+
+
+
+ 4.0.0
+
+ org.apache.flink
+ flink-connector-redis
+ 1.16-SNAPSHOT
+ jar
+ Flink : Connectors : Redis
+
+
+
+ The Apache Software License, Version 2.0
+ https://www.apache.org/licenses/LICENSE-2.0.txt
+ repo
+
+
+
+
+ UTF-8
+ UTF-8
+
+
+ 1.15.1
+
+ 1.8
+ ${target.java.version}
+ ${target.java.version}
+
+
+
+ 4.2.3
+ 1.7.36
+ 1.17.3
+
+
+ 5.8.2
+
+
+
+
+
+
+
+ redis.clients
+ jedis
+ ${jedis.version}
+ compile
+
+
+ org.slf4j
+ slf4j.api
+
+
+
+
+
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+
+
+
+
+
+ org.apache.flink
+ flink-connector-base
+ ${flink.version}
+
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+ provided
+
+
+
+
+
+
+ org.apache.flink
+ flink-table-api-java-bridge
+ ${flink.version}
+ provided
+ true
+
+
+
+ org.apache.flink
+ flink-test-utils
+ ${flink.version}
+ test
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+
+
+
+
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ ${junit.jupiter.version}
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter
+ ${junit.jupiter.version}
+ test
+
+
+ org.testcontainers
+ testcontainers
+ ${testcontainers.version}
+ test
+
+
+ org.testcontainers
+ junit-jupiter
+ ${testcontainers.version}
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+ %regex[.*ITCase.*]
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/org/apache/flink/connector/redis/JedisConfigConstants.java b/src/main/java/org/apache/flink/connector/redis/JedisConfigConstants.java
new file mode 100644
index 0000000..19b7e3e
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/redis/JedisConfigConstants.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redis;
+
+import redis.clients.jedis.Protocol;
+
+/** */
+public class JedisConfigConstants {
+
+ public static final String REDIS_HOST = "redis.host";
+
+ public static final String REDIS_PORT = "redis.port";
+
+ public static final String DEFAULT_REDIS_HOST = Protocol.DEFAULT_HOST;
+
+ public static final int DEFAULT_REDIS_PORT = Protocol.DEFAULT_PORT;
+}
diff --git a/src/main/java/org/apache/flink/connector/redis/JedisUtils.java b/src/main/java/org/apache/flink/connector/redis/JedisUtils.java
new file mode 100644
index 0000000..d5c92e0
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/redis/JedisUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redis;
+
+import redis.clients.jedis.Connection;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.UnifiedJedis;
+
+import java.util.Properties;
+
+/** */
+public class JedisUtils {
+
+ public static UnifiedJedis createJedis(Properties configProps) {
+
+ String host =
+ configProps.getProperty(
+ JedisConfigConstants.REDIS_HOST, JedisConfigConstants.DEFAULT_REDIS_HOST);
+
+ int port =
+ Integer.parseInt(
+ configProps.getProperty(
+ JedisConfigConstants.REDIS_PORT,
+ Integer.toString(JedisConfigConstants.DEFAULT_REDIS_PORT)));
+
+ return new UnifiedJedis(new HostAndPort(host, port));
+ }
+
+ public static Connection createConnection(Properties configProps) {
+
+ String host =
+ configProps.getProperty(
+ JedisConfigConstants.REDIS_HOST, JedisConfigConstants.DEFAULT_REDIS_HOST);
+
+ int port =
+ Integer.parseInt(
+ configProps.getProperty(
+ JedisConfigConstants.REDIS_PORT,
+ Integer.toString(JedisConfigConstants.DEFAULT_REDIS_PORT)));
+
+ return new Connection(host, port);
+ }
+}
diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/RedisAsyncWriter.java b/src/main/java/org/apache/flink/connector/redis/sink2/RedisAsyncWriter.java
new file mode 100644
index 0000000..1a49485
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/redis/sink2/RedisAsyncWriter.java
@@ -0,0 +1,54 @@
+package org.apache.flink.connector.redis.sink2;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import redis.clients.jedis.UnifiedJedis;
+import redis.clients.jedis.exceptions.JedisDataException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+public class RedisAsyncWriter extends AsyncSinkWriter {
+
+ private final UnifiedJedis jedis;
+
+ public RedisAsyncWriter(
+ UnifiedJedis jedis,
+ RedisConverter converter,
+ RedisSinkConfig sinkConfig,
+ Sink.InitContext context) {
+ super(
+ converter,
+ context,
+ sinkConfig.maxBatchSize,
+ sinkConfig.maxInFlightRequests,
+ sinkConfig.maxBufferedRequests,
+ sinkConfig.maxBatchSizeInBytes,
+ sinkConfig.maxTimeInBufferMS,
+ sinkConfig.maxRecordSizeInBytes);
+ this.jedis = jedis;
+ }
+
+ @Override
+ protected void submitRequestEntries(
+ List requests, Consumer> consumer) {
+ List toRetry = new ArrayList<>();
+ for (RedisWriteRequest request : requests) {
+ try {
+ request.write(jedis);
+ } catch (JedisDataException de) {
+ // not worth retrying; will fail again
+ // TODO ?
+ } catch (Exception e) {
+ toRetry.add(request);
+ }
+ }
+ consumer.accept(toRetry);
+ }
+
+ @Override
+ protected long getSizeInBytes(RedisWriteRequest request) {
+ return request.getSizeInBytes();
+ }
+}
diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/RedisConverter.java b/src/main/java/org/apache/flink/connector/redis/sink2/RedisConverter.java
new file mode 100644
index 0000000..caf9243
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/redis/sink2/RedisConverter.java
@@ -0,0 +1,18 @@
+package org.apache.flink.connector.redis.sink2;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+public class RedisConverter implements ElementConverter {
+
+ private final RedisSerializer serializer;
+
+ public RedisConverter(RedisSerializer serializer) {
+ this.serializer = serializer;
+ }
+
+ @Override
+ public RedisWriteRequest apply(T input, SinkWriter.Context context) {
+ return serializer.serialize(input);
+ }
+}
diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/RedisSerializer.java b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSerializer.java
new file mode 100644
index 0000000..d1e323e
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSerializer.java
@@ -0,0 +1,10 @@
+package org.apache.flink.connector.redis.sink2;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+public interface RedisSerializer extends Function, Serializable {
+
+ RedisWriteRequest serialize(T input);
+}
diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/RedisSink.java b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSink.java
new file mode 100644
index 0000000..bb6f63b
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSink.java
@@ -0,0 +1,32 @@
+package org.apache.flink.connector.redis.sink2;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.redis.JedisUtils;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public class RedisSink implements Sink {
+
+ private final Properties config;
+ private final RedisSerializer serializer;
+
+ private final RedisSinkConfig sinkConfig;
+
+ public RedisSink(Properties config, RedisSinkConfig sinkConfig, RedisSerializer serializer) {
+ this.config = config;
+ this.sinkConfig = sinkConfig;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public SinkWriter createWriter(Sink.InitContext initContext) throws IOException {
+// return new RedisAsyncWriter<>(
+// JedisUtils.createJedis(config),
+// new RedisConverter<>(serializer),
+// sinkConfig,
+// initContext);
+ return new RedisSyncWriter(JedisUtils.createConnection(config), serializer);
+ }
+}
diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/RedisSinkConfig.java b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSinkConfig.java
new file mode 100644
index 0000000..8d3d015
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSinkConfig.java
@@ -0,0 +1,81 @@
+package org.apache.flink.connector.redis.sink2;
+
+import java.io.Serializable;
+
+public class RedisSinkConfig implements Serializable {
+
+ public final int maxBatchSize;
+ public final int maxInFlightRequests;
+ public final int maxBufferedRequests;
+ public final long maxBatchSizeInBytes;
+ public final long maxTimeInBufferMS;
+ public final long maxRecordSizeInBytes;
+
+ public RedisSinkConfig(
+ int maxBatchSize,
+ int maxInFlightRequests,
+ int maxBufferedRequests,
+ long maxBatchSizeInBytes,
+ long maxTimeInBufferMS,
+ long maxRecordSizeInBytes) {
+ this.maxBatchSize = maxBatchSize;
+ this.maxInFlightRequests = maxInFlightRequests;
+ this.maxBufferedRequests = maxBufferedRequests;
+ this.maxBatchSizeInBytes = maxBatchSizeInBytes;
+ this.maxTimeInBufferMS = maxTimeInBufferMS;
+ this.maxRecordSizeInBytes = maxRecordSizeInBytes;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private int maxBatchSize = 10;
+ private int maxInFlightRequests = 1;
+ private int maxBufferedRequests = 100;
+ private long maxBatchSizeInBytes = 110;
+ private long maxTimeInBufferMS = 1_000;
+ private long maxRecordSizeInBytes = maxBatchSizeInBytes;
+
+ public Builder maxBatchSize(int maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+ return this;
+ }
+
+ public Builder maxInFlightRequests(int maxInFlightRequests) {
+ this.maxInFlightRequests = maxInFlightRequests;
+ return this;
+ }
+
+ public Builder maxBufferedRequests(int maxBufferedRequests) {
+ this.maxBufferedRequests = maxBufferedRequests;
+ return this;
+ }
+
+ public Builder maxBatchSizeInBytes(long maxBatchSizeInBytes) {
+ this.maxBatchSizeInBytes = maxBatchSizeInBytes;
+ return this;
+ }
+
+ public Builder maxTimeInBufferMS(long maxTimeInBufferMS) {
+ this.maxTimeInBufferMS = maxTimeInBufferMS;
+ return this;
+ }
+
+ public Builder maxRecordSizeInBytes(long maxRecordSizeInBytes) {
+ this.maxRecordSizeInBytes = maxRecordSizeInBytes;
+ return this;
+ }
+
+ public RedisSinkConfig build() {
+ return new RedisSinkConfig(
+ maxBatchSize,
+ maxInFlightRequests,
+ maxBufferedRequests,
+ maxBatchSizeInBytes,
+ maxTimeInBufferMS,
+ maxRecordSizeInBytes);
+ }
+ }
+}
diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/RedisSyncWriter.java b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSyncWriter.java
new file mode 100644
index 0000000..0810c7f
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSyncWriter.java
@@ -0,0 +1,69 @@
+package org.apache.flink.connector.redis.sink2;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import redis.clients.jedis.Connection;
+import redis.clients.jedis.MultiNodePipelineBase;
+import redis.clients.jedis.Pipeline;
+
+import java.io.IOException;
+
+public class RedisSyncWriter implements SinkWriter {
+
+ private final Connection conn;
+ private final Pipeline pipe;
+ private final MultiNodePipelineBase mpipe;
+
+ private final RedisSerializer serializer;
+
+ public RedisSyncWriter(
+ Connection connection, RedisSerializer serializer) {
+ this.conn = connection;
+ this.pipe = new Pipeline(this.conn);
+ this.mpipe = null;
+ this.serializer = serializer;
+ }
+
+ public RedisSyncWriter(
+ Pipeline pipe, RedisSerializer serializer) {
+ this.conn = null;
+ this.pipe = pipe;
+ this.mpipe = null;
+ this.serializer = serializer;
+ }
+
+ public RedisSyncWriter(
+ MultiNodePipelineBase mpipe, RedisSerializer serializer) {
+ this.conn = null;
+ this.pipe = null;
+ this.mpipe = mpipe;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public void write(IN input, Context context) throws IOException, InterruptedException {
+ RedisWriteRequest request = serializer.serialize(input);
+ if (pipe != null) {
+ request.write(pipe);
+ } else {
+ request.write(mpipe);
+ }
+ }
+
+ @Override
+ public void flush(boolean b) throws IOException, InterruptedException {
+ if (pipe != null) {
+ pipe.sync();
+ } else {
+ mpipe.sync();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (conn != null) {
+ conn.close();
+ } else {
+ mpipe.close();
+ }
+ }
+}
diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/RedisWriteRequest.java b/src/main/java/org/apache/flink/connector/redis/sink2/RedisWriteRequest.java
new file mode 100644
index 0000000..e651043
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/redis/sink2/RedisWriteRequest.java
@@ -0,0 +1,14 @@
+package org.apache.flink.connector.redis.sink2;
+
+import redis.clients.jedis.commands.JedisCommands;
+import redis.clients.jedis.commands.PipelineCommands;
+
+import java.io.Serializable;
+
+public interface RedisWriteRequest extends Serializable {
+ void write(JedisCommands jedis);
+
+ void write(PipelineCommands pipe);
+
+ long getSizeInBytes();
+}
diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/StreamWriteRequest.java b/src/main/java/org/apache/flink/connector/redis/sink2/StreamWriteRequest.java
new file mode 100644
index 0000000..2dda450
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/redis/sink2/StreamWriteRequest.java
@@ -0,0 +1,37 @@
+package org.apache.flink.connector.redis.sink2;
+
+import redis.clients.jedis.commands.JedisCommands;
+import redis.clients.jedis.commands.PipelineCommands;
+import redis.clients.jedis.params.XAddParams;
+
+import java.util.Map;
+
+public class StreamWriteRequest implements RedisWriteRequest {
+
+ private final String key;
+ private final Map value;
+
+ public StreamWriteRequest(String key, Map value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public void write(JedisCommands jedis) {
+ jedis.xadd(key, XAddParams.xAddParams(), value);
+ }
+
+ @Override
+ public void write(PipelineCommands pipe) {
+ pipe.xadd(key, XAddParams.xAddParams(), value);
+ }
+
+ @Override
+ public long getSizeInBytes() {
+ long size = 9;
+ for (Map.Entry entry: value.entrySet()) {
+ size += 10 + entry.getKey().length() + entry.getValue().length();
+ }
+ return size;
+ }
+}
diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/StringWriteRequest.java b/src/main/java/org/apache/flink/connector/redis/sink2/StringWriteRequest.java
new file mode 100644
index 0000000..2d85b98
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/redis/sink2/StringWriteRequest.java
@@ -0,0 +1,30 @@
+package org.apache.flink.connector.redis.sink2;
+
+import redis.clients.jedis.commands.JedisCommands;
+import redis.clients.jedis.commands.PipelineCommands;
+
+public class StringWriteRequest implements RedisWriteRequest {
+
+ private final String key;
+ private final String value;
+
+ public StringWriteRequest(String key, String value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public void write(JedisCommands jedis) {
+ jedis.set(key, value);
+ }
+
+ @Override
+ public void write(PipelineCommands pipe) {
+ pipe.set(key, value);
+ }
+
+ @Override
+ public long getSizeInBytes() {
+ return 18 + key.length() + value.length();
+ }
+}
diff --git a/src/test/java/org/apache/flink/connector/redis/RedisITCaseBase.java b/src/test/java/org/apache/flink/connector/redis/RedisITCaseBase.java
new file mode 100644
index 0000000..54442a7
--- /dev/null
+++ b/src/test/java/org/apache/flink/connector/redis/RedisITCaseBase.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redis;
+
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.UnifiedJedis;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/** */
+@Testcontainers
+public abstract class RedisITCaseBase extends AbstractTestBase {
+
+ public static final String REDIS_IMAGE = "redis";
+ private static final int REDIS_PORT = 6379;
+
+ private static final AtomicBoolean running = new AtomicBoolean(false);
+
+ @Container
+ private static final GenericContainer> redis =
+ new GenericContainer<>(DockerImageName.parse(REDIS_IMAGE)).withExposedPorts(REDIS_PORT);
+
+ public static MiniClusterWithClientResource cluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(2)
+ .setNumberTaskManagers(1)
+ .build());
+ protected UnifiedJedis jedis;
+
+ @BeforeAll
+ public static void beforeAll() throws Exception {
+ cluster.before();
+ }
+
+ @AfterAll
+ public static void afterAll() {
+ cluster.after();
+ }
+
+ protected static synchronized void start() {
+ if (!running.get()) {
+ redis.start();
+ running.set(true);
+ }
+ }
+
+ protected static void stop() {
+ redis.stop();
+ running.set(false);
+ }
+
+ @BeforeEach
+ public void setUp() {
+ Properties config = getConfigProperties();
+ try (Jedis j =
+ new Jedis(
+ config.getProperty(JedisConfigConstants.REDIS_HOST),
+ Integer.parseInt(config.getProperty(JedisConfigConstants.REDIS_PORT)))) {
+ j.flushAll();
+ }
+
+ jedis = JedisUtils.createJedis(config);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (jedis != null) {
+ jedis.close();
+ }
+ }
+
+ protected Properties getConfigProperties() {
+ start();
+
+ Properties configProps = new Properties();
+ // configProps.setProperty(RedisConfigConstants.REDIS_HOST, redis.getContainerIpAddress());
+ configProps.setProperty(JedisConfigConstants.REDIS_HOST, redis.getHost());
+ configProps.setProperty(
+ JedisConfigConstants.REDIS_PORT, Integer.toString(redis.getFirstMappedPort()));
+ return configProps;
+ }
+}
diff --git a/src/test/java/org/apache/flink/connector/redis/sink2/RedisSinkTest.java b/src/test/java/org/apache/flink/connector/redis/sink2/RedisSinkTest.java
new file mode 100644
index 0000000..581b204
--- /dev/null
+++ b/src/test/java/org/apache/flink/connector/redis/sink2/RedisSinkTest.java
@@ -0,0 +1,61 @@
+package org.apache.flink.connector.redis.sink2;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.connector.redis.RedisITCaseBase;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class RedisSinkTest extends RedisITCaseBase {
+
+ @Test
+ public void testStringCommand() throws Exception {
+ RedisSerializer serializer = input -> new StringWriteRequest("key-"+input, "value-"+input);
+ RedisSinkConfig redisSinkConfig = RedisSinkConfig.builder().build();
+ RedisSink underTest = new RedisSink<>(getConfigProperties(), redisSinkConfig, serializer);
+
+ List source = Arrays.asList("one", "two", "three");
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+ env.fromCollection(source).sinkTo(underTest);
+ env.execute();
+
+ // verify results
+ source.forEach(entry -> assertEquals("value-" + entry, jedis.get("key-"+entry)));
+ }
+
+ @Test
+ public void testStreamCommand() throws Exception {
+ RedisSerializer> serializer =
+ input -> {
+ Map value = new HashMap<>();
+ value.put(input.f1, input.f2);
+ return new StreamWriteRequest(input.f0, value);
+ };
+ RedisSinkConfig redisSinkConfig = RedisSinkConfig.builder().build();
+ RedisSink> underTest =
+ new RedisSink<>(getConfigProperties(), redisSinkConfig, serializer);
+
+ List> source =
+ Arrays.asList(
+ Tuple3.of("one", "onekey", "onevalue"),
+ Tuple3.of("two", "firstkey", "firstvalue"),
+ Tuple3.of("two", "secontkey", "secondvalue"));
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+ env.fromCollection(source).sinkTo(underTest);
+ env.execute();
+
+ // verify results
+ assertEquals(1, jedis.xlen("one"));
+ assertEquals(2, jedis.xlen("two"));
+ }
+}