From 92878f5c6a17041b52f2b34143547c2fc576c572 Mon Sep 17 00:00:00 2001
From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com>
Date: Wed, 10 Aug 2022 13:42:52 +0600
Subject: [PATCH 1/2] [FLINK-15571] Add Redis Stream and String sinks
---
pom.xml | 169 ++++++++++++++++++
.../connector/redis/JedisConfigConstants.java | 32 ++++
.../flink/connector/redis/JedisUtils.java | 42 +++++
.../redis/sink2/RedisAsyncWriter.java | 54 ++++++
.../connector/redis/sink2/RedisConverter.java | 18 ++
.../redis/sink2/RedisSerializer.java | 10 ++
.../connector/redis/sink2/RedisSink.java | 31 ++++
.../redis/sink2/RedisSinkConfig.java | 81 +++++++++
.../redis/sink2/RedisSyncWriter.java | 49 +++++
.../redis/sink2/RedisWriteRequest.java | 11 ++
.../redis/sink2/StreamWriteRequest.java | 31 ++++
.../redis/sink2/StringWriteRequest.java | 24 +++
.../connector/redis/RedisITCaseBase.java | 110 ++++++++++++
.../connector/redis/sink2/RedisSinkTest.java | 61 +++++++
14 files changed, 723 insertions(+)
create mode 100644 pom.xml
create mode 100644 src/main/java/org/apache/flink/connector/redis/JedisConfigConstants.java
create mode 100644 src/main/java/org/apache/flink/connector/redis/JedisUtils.java
create mode 100644 src/main/java/org/apache/flink/connector/redis/sink2/RedisAsyncWriter.java
create mode 100644 src/main/java/org/apache/flink/connector/redis/sink2/RedisConverter.java
create mode 100644 src/main/java/org/apache/flink/connector/redis/sink2/RedisSerializer.java
create mode 100644 src/main/java/org/apache/flink/connector/redis/sink2/RedisSink.java
create mode 100644 src/main/java/org/apache/flink/connector/redis/sink2/RedisSinkConfig.java
create mode 100644 src/main/java/org/apache/flink/connector/redis/sink2/RedisSyncWriter.java
create mode 100644 src/main/java/org/apache/flink/connector/redis/sink2/RedisWriteRequest.java
create mode 100644 src/main/java/org/apache/flink/connector/redis/sink2/StreamWriteRequest.java
create mode 100644 src/main/java/org/apache/flink/connector/redis/sink2/StringWriteRequest.java
create mode 100644 src/test/java/org/apache/flink/connector/redis/RedisITCaseBase.java
create mode 100644 src/test/java/org/apache/flink/connector/redis/sink2/RedisSinkTest.java
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..c008740
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,169 @@
+
+
+
+ 4.0.0
+
+ org.apache.flink
+ flink-connector-redis
+ 1.16-SNAPSHOT
+ jar
+ Flink : Connectors : Redis
+
+
+
+ The Apache Software License, Version 2.0
+ https://www.apache.org/licenses/LICENSE-2.0.txt
+ repo
+
+
+
+
+ UTF-8
+ UTF-8
+
+
+ 1.15.1
+
+ 1.8
+ ${target.java.version}
+ ${target.java.version}
+
+
+
+ 4.2.3
+ 1.7.36
+ 1.17.3
+
+
+ 5.8.2
+
+
+
+
+
+
+
+ redis.clients
+ jedis
+ ${jedis.version}
+ compile
+
+
+ org.slf4j
+ slf4j.api
+
+
+
+
+
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+
+
+
+
+
+ org.apache.flink
+ flink-connector-base
+ ${flink.version}
+
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+ provided
+
+
+
+
+
+
+ org.apache.flink
+ flink-table-api-java-bridge
+ ${flink.version}
+ provided
+ true
+
+
+
+ org.apache.flink
+ flink-test-utils
+ ${flink.version}
+ test
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+
+
+
+
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ ${junit.jupiter.version}
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter
+ ${junit.jupiter.version}
+ test
+
+
+ org.testcontainers
+ testcontainers
+ ${testcontainers.version}
+ test
+
+
+ org.testcontainers
+ junit-jupiter
+ ${testcontainers.version}
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+ %regex[.*ITCase.*]
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/org/apache/flink/connector/redis/JedisConfigConstants.java b/src/main/java/org/apache/flink/connector/redis/JedisConfigConstants.java
new file mode 100644
index 0000000..19b7e3e
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/redis/JedisConfigConstants.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redis;
+
+import redis.clients.jedis.Protocol;
+
+/** */
+public class JedisConfigConstants {
+
+ public static final String REDIS_HOST = "redis.host";
+
+ public static final String REDIS_PORT = "redis.port";
+
+ public static final String DEFAULT_REDIS_HOST = Protocol.DEFAULT_HOST;
+
+ public static final int DEFAULT_REDIS_PORT = Protocol.DEFAULT_PORT;
+}
diff --git a/src/main/java/org/apache/flink/connector/redis/JedisUtils.java b/src/main/java/org/apache/flink/connector/redis/JedisUtils.java
new file mode 100644
index 0000000..8cbdc79
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/redis/JedisUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redis;
+
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.UnifiedJedis;
+
+import java.util.Properties;
+
+/** */
+public class JedisUtils {
+
+ public static UnifiedJedis createJedis(Properties configProps) {
+
+ String host =
+ configProps.getProperty(
+ JedisConfigConstants.REDIS_HOST, JedisConfigConstants.DEFAULT_REDIS_HOST);
+
+ int port =
+ Integer.parseInt(
+ configProps.getProperty(
+ JedisConfigConstants.REDIS_PORT,
+ Integer.toString(JedisConfigConstants.DEFAULT_REDIS_PORT)));
+
+ return new UnifiedJedis(new HostAndPort(host, port));
+ }
+}
diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/RedisAsyncWriter.java b/src/main/java/org/apache/flink/connector/redis/sink2/RedisAsyncWriter.java
new file mode 100644
index 0000000..53ec1f7
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/redis/sink2/RedisAsyncWriter.java
@@ -0,0 +1,54 @@
+package org.apache.flink.connector.redis.sink2;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import redis.clients.jedis.UnifiedJedis;
+import redis.clients.jedis.exceptions.JedisDataException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+public class RedisAsyncWriter extends AsyncSinkWriter {
+
+ private final UnifiedJedis jedis;
+
+ public RedisAsyncWriter(
+ UnifiedJedis jedis,
+ RedisConverter converter,
+ RedisSinkConfig sinkConfig,
+ Sink.InitContext context) {
+ super(
+ converter,
+ context,
+ sinkConfig.maxBatchSize,
+ sinkConfig.maxInFlightRequests,
+ sinkConfig.maxBufferedRequests,
+ sinkConfig.maxBatchSizeInBytes,
+ sinkConfig.maxTimeInBufferMS,
+ sinkConfig.maxRecordSizeInBytes);
+ this.jedis = jedis;
+ }
+
+ @Override
+ protected void submitRequestEntries(
+ List requests, Consumer> consumer) {
+ List toRetry = new ArrayList<>();
+ for (RedisWriteRequest request : requests) {
+ try {
+ request.write(jedis);
+ } catch (JedisDataException de) {
+ // not worth retrying; will fail again
+ // TODO
+ } catch (Exception e) {
+ toRetry.add(request);
+ }
+ }
+ consumer.accept(toRetry);
+ }
+
+ @Override
+ protected long getSizeInBytes(RedisWriteRequest request) {
+ return request.getSizeInBytes();
+ }
+}
diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/RedisConverter.java b/src/main/java/org/apache/flink/connector/redis/sink2/RedisConverter.java
new file mode 100644
index 0000000..caf9243
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/redis/sink2/RedisConverter.java
@@ -0,0 +1,18 @@
+package org.apache.flink.connector.redis.sink2;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+public class RedisConverter implements ElementConverter {
+
+ private final RedisSerializer serializer;
+
+ public RedisConverter(RedisSerializer serializer) {
+ this.serializer = serializer;
+ }
+
+ @Override
+ public RedisWriteRequest apply(T input, SinkWriter.Context context) {
+ return serializer.serialize(input);
+ }
+}
diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/RedisSerializer.java b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSerializer.java
new file mode 100644
index 0000000..d1e323e
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSerializer.java
@@ -0,0 +1,10 @@
+package org.apache.flink.connector.redis.sink2;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+public interface RedisSerializer extends Function, Serializable {
+
+ RedisWriteRequest serialize(T input);
+}
diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/RedisSink.java b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSink.java
new file mode 100644
index 0000000..461987a
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSink.java
@@ -0,0 +1,31 @@
+package org.apache.flink.connector.redis.sink2;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.redis.JedisUtils;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public class RedisSink implements Sink {
+
+ private final Properties config;
+ private final RedisSerializer serializer;
+
+ private final RedisSinkConfig sinkConfig;
+
+ public RedisSink(Properties config, RedisSinkConfig sinkConfig, RedisSerializer serializer) {
+ this.config = config;
+ this.sinkConfig = sinkConfig;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public SinkWriter createWriter(Sink.InitContext initContext) throws IOException {
+ return new RedisAsyncWriter<>(
+ JedisUtils.createJedis(config),
+ new RedisConverter<>(serializer),
+ sinkConfig,
+ initContext);
+ }
+}
diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/RedisSinkConfig.java b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSinkConfig.java
new file mode 100644
index 0000000..8d3d015
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSinkConfig.java
@@ -0,0 +1,81 @@
+package org.apache.flink.connector.redis.sink2;
+
+import java.io.Serializable;
+
+public class RedisSinkConfig implements Serializable {
+
+ public final int maxBatchSize;
+ public final int maxInFlightRequests;
+ public final int maxBufferedRequests;
+ public final long maxBatchSizeInBytes;
+ public final long maxTimeInBufferMS;
+ public final long maxRecordSizeInBytes;
+
+ public RedisSinkConfig(
+ int maxBatchSize,
+ int maxInFlightRequests,
+ int maxBufferedRequests,
+ long maxBatchSizeInBytes,
+ long maxTimeInBufferMS,
+ long maxRecordSizeInBytes) {
+ this.maxBatchSize = maxBatchSize;
+ this.maxInFlightRequests = maxInFlightRequests;
+ this.maxBufferedRequests = maxBufferedRequests;
+ this.maxBatchSizeInBytes = maxBatchSizeInBytes;
+ this.maxTimeInBufferMS = maxTimeInBufferMS;
+ this.maxRecordSizeInBytes = maxRecordSizeInBytes;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private int maxBatchSize = 10;
+ private int maxInFlightRequests = 1;
+ private int maxBufferedRequests = 100;
+ private long maxBatchSizeInBytes = 110;
+ private long maxTimeInBufferMS = 1_000;
+ private long maxRecordSizeInBytes = maxBatchSizeInBytes;
+
+ public Builder maxBatchSize(int maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+ return this;
+ }
+
+ public Builder maxInFlightRequests(int maxInFlightRequests) {
+ this.maxInFlightRequests = maxInFlightRequests;
+ return this;
+ }
+
+ public Builder maxBufferedRequests(int maxBufferedRequests) {
+ this.maxBufferedRequests = maxBufferedRequests;
+ return this;
+ }
+
+ public Builder maxBatchSizeInBytes(long maxBatchSizeInBytes) {
+ this.maxBatchSizeInBytes = maxBatchSizeInBytes;
+ return this;
+ }
+
+ public Builder maxTimeInBufferMS(long maxTimeInBufferMS) {
+ this.maxTimeInBufferMS = maxTimeInBufferMS;
+ return this;
+ }
+
+ public Builder maxRecordSizeInBytes(long maxRecordSizeInBytes) {
+ this.maxRecordSizeInBytes = maxRecordSizeInBytes;
+ return this;
+ }
+
+ public RedisSinkConfig build() {
+ return new RedisSinkConfig(
+ maxBatchSize,
+ maxInFlightRequests,
+ maxBufferedRequests,
+ maxBatchSizeInBytes,
+ maxTimeInBufferMS,
+ maxRecordSizeInBytes);
+ }
+ }
+}
diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/RedisSyncWriter.java b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSyncWriter.java
new file mode 100644
index 0000000..5281c9e
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSyncWriter.java
@@ -0,0 +1,49 @@
+package org.apache.flink.connector.redis.sink2;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import redis.clients.jedis.UnifiedJedis;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+public class RedisSyncWriter implements SinkWriter {
+
+ private final UnifiedJedis jedis;
+
+ private final RedisSerializer serializer;
+
+ private final boolean writeImmediate;
+
+ private final Queue queue = new ArrayDeque<>();
+
+ public RedisSyncWriter(
+ UnifiedJedis jedis, RedisSerializer serializer, boolean writeImmediate) {
+ this.jedis = jedis;
+ this.serializer = serializer;
+ this.writeImmediate = writeImmediate;
+ }
+
+ @Override
+ public void write(IN input, Context context) throws IOException, InterruptedException {
+ RedisWriteRequest request = serializer.serialize(input);
+ if (writeImmediate) {
+ request.write(jedis);
+ } else {
+ queue.add(request);
+ }
+ }
+
+ @Override
+ public void flush(boolean b) throws IOException, InterruptedException {
+ while (!queue.isEmpty()) {
+ RedisWriteRequest request = queue.remove();
+ request.write(jedis);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ jedis.close();
+ }
+}
diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/RedisWriteRequest.java b/src/main/java/org/apache/flink/connector/redis/sink2/RedisWriteRequest.java
new file mode 100644
index 0000000..df9d01e
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/redis/sink2/RedisWriteRequest.java
@@ -0,0 +1,11 @@
+package org.apache.flink.connector.redis.sink2;
+
+import redis.clients.jedis.UnifiedJedis;
+
+import java.io.Serializable;
+
+public interface RedisWriteRequest extends Serializable {
+ void write(UnifiedJedis jedis);
+
+ long getSizeInBytes();
+}
diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/StreamWriteRequest.java b/src/main/java/org/apache/flink/connector/redis/sink2/StreamWriteRequest.java
new file mode 100644
index 0000000..897cce2
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/redis/sink2/StreamWriteRequest.java
@@ -0,0 +1,31 @@
+package org.apache.flink.connector.redis.sink2;
+
+import redis.clients.jedis.UnifiedJedis;
+import redis.clients.jedis.params.XAddParams;
+
+import java.util.Map;
+
+public class StreamWriteRequest implements RedisWriteRequest {
+
+ private final String key;
+ private final Map value;
+
+ public StreamWriteRequest(String key, Map value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public void write(UnifiedJedis jedis) {
+ jedis.xadd(key, XAddParams.xAddParams(), value);
+ }
+
+ @Override
+ public long getSizeInBytes() {
+ long size = 9;
+ for (Map.Entry entry: value.entrySet()) {
+ size += 10 + entry.getKey().length() + entry.getValue().length();
+ }
+ return size;
+ }
+}
diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/StringWriteRequest.java b/src/main/java/org/apache/flink/connector/redis/sink2/StringWriteRequest.java
new file mode 100644
index 0000000..2db14af
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/redis/sink2/StringWriteRequest.java
@@ -0,0 +1,24 @@
+package org.apache.flink.connector.redis.sink2;
+
+import redis.clients.jedis.UnifiedJedis;
+
+public class StringWriteRequest implements RedisWriteRequest {
+
+ private final String key;
+ private final String value;
+
+ public StringWriteRequest(String key, String value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public void write(UnifiedJedis jedis) {
+ jedis.set(key, value);
+ }
+
+ @Override
+ public long getSizeInBytes() {
+ return 18 + key.length() + value.length();
+ }
+}
diff --git a/src/test/java/org/apache/flink/connector/redis/RedisITCaseBase.java b/src/test/java/org/apache/flink/connector/redis/RedisITCaseBase.java
new file mode 100644
index 0000000..54442a7
--- /dev/null
+++ b/src/test/java/org/apache/flink/connector/redis/RedisITCaseBase.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redis;
+
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.UnifiedJedis;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/** */
+@Testcontainers
+public abstract class RedisITCaseBase extends AbstractTestBase {
+
+ public static final String REDIS_IMAGE = "redis";
+ private static final int REDIS_PORT = 6379;
+
+ private static final AtomicBoolean running = new AtomicBoolean(false);
+
+ @Container
+ private static final GenericContainer> redis =
+ new GenericContainer<>(DockerImageName.parse(REDIS_IMAGE)).withExposedPorts(REDIS_PORT);
+
+ public static MiniClusterWithClientResource cluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(2)
+ .setNumberTaskManagers(1)
+ .build());
+ protected UnifiedJedis jedis;
+
+ @BeforeAll
+ public static void beforeAll() throws Exception {
+ cluster.before();
+ }
+
+ @AfterAll
+ public static void afterAll() {
+ cluster.after();
+ }
+
+ protected static synchronized void start() {
+ if (!running.get()) {
+ redis.start();
+ running.set(true);
+ }
+ }
+
+ protected static void stop() {
+ redis.stop();
+ running.set(false);
+ }
+
+ @BeforeEach
+ public void setUp() {
+ Properties config = getConfigProperties();
+ try (Jedis j =
+ new Jedis(
+ config.getProperty(JedisConfigConstants.REDIS_HOST),
+ Integer.parseInt(config.getProperty(JedisConfigConstants.REDIS_PORT)))) {
+ j.flushAll();
+ }
+
+ jedis = JedisUtils.createJedis(config);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (jedis != null) {
+ jedis.close();
+ }
+ }
+
+ protected Properties getConfigProperties() {
+ start();
+
+ Properties configProps = new Properties();
+ // configProps.setProperty(RedisConfigConstants.REDIS_HOST, redis.getContainerIpAddress());
+ configProps.setProperty(JedisConfigConstants.REDIS_HOST, redis.getHost());
+ configProps.setProperty(
+ JedisConfigConstants.REDIS_PORT, Integer.toString(redis.getFirstMappedPort()));
+ return configProps;
+ }
+}
diff --git a/src/test/java/org/apache/flink/connector/redis/sink2/RedisSinkTest.java b/src/test/java/org/apache/flink/connector/redis/sink2/RedisSinkTest.java
new file mode 100644
index 0000000..581b204
--- /dev/null
+++ b/src/test/java/org/apache/flink/connector/redis/sink2/RedisSinkTest.java
@@ -0,0 +1,61 @@
+package org.apache.flink.connector.redis.sink2;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.connector.redis.RedisITCaseBase;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class RedisSinkTest extends RedisITCaseBase {
+
+ @Test
+ public void testStringCommand() throws Exception {
+ RedisSerializer serializer = input -> new StringWriteRequest("key-"+input, "value-"+input);
+ RedisSinkConfig redisSinkConfig = RedisSinkConfig.builder().build();
+ RedisSink underTest = new RedisSink<>(getConfigProperties(), redisSinkConfig, serializer);
+
+ List source = Arrays.asList("one", "two", "three");
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+ env.fromCollection(source).sinkTo(underTest);
+ env.execute();
+
+ // verify results
+ source.forEach(entry -> assertEquals("value-" + entry, jedis.get("key-"+entry)));
+ }
+
+ @Test
+ public void testStreamCommand() throws Exception {
+ RedisSerializer> serializer =
+ input -> {
+ Map value = new HashMap<>();
+ value.put(input.f1, input.f2);
+ return new StreamWriteRequest(input.f0, value);
+ };
+ RedisSinkConfig redisSinkConfig = RedisSinkConfig.builder().build();
+ RedisSink> underTest =
+ new RedisSink<>(getConfigProperties(), redisSinkConfig, serializer);
+
+ List> source =
+ Arrays.asList(
+ Tuple3.of("one", "onekey", "onevalue"),
+ Tuple3.of("two", "firstkey", "firstvalue"),
+ Tuple3.of("two", "secontkey", "secondvalue"));
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+ env.fromCollection(source).sinkTo(underTest);
+ env.execute();
+
+ // verify results
+ assertEquals(1, jedis.xlen("one"));
+ assertEquals(2, jedis.xlen("two"));
+ }
+}
From 9631808243a676729870f1576360b1d09624165c Mon Sep 17 00:00:00 2001
From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com>
Date: Fri, 26 Aug 2022 21:35:34 +0600
Subject: [PATCH 2/2] use Pipeline
---
.../flink/connector/redis/JedisUtils.java | 16 ++++++
.../redis/sink2/RedisAsyncWriter.java | 2 +-
.../connector/redis/sink2/RedisSink.java | 11 ++--
.../redis/sink2/RedisSyncWriter.java | 52 +++++++++++++------
.../redis/sink2/RedisWriteRequest.java | 7 ++-
.../redis/sink2/StreamWriteRequest.java | 10 +++-
.../redis/sink2/StringWriteRequest.java | 10 +++-
7 files changed, 80 insertions(+), 28 deletions(-)
diff --git a/src/main/java/org/apache/flink/connector/redis/JedisUtils.java b/src/main/java/org/apache/flink/connector/redis/JedisUtils.java
index 8cbdc79..d5c92e0 100644
--- a/src/main/java/org/apache/flink/connector/redis/JedisUtils.java
+++ b/src/main/java/org/apache/flink/connector/redis/JedisUtils.java
@@ -17,6 +17,7 @@
package org.apache.flink.connector.redis;
+import redis.clients.jedis.Connection;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.UnifiedJedis;
@@ -39,4 +40,19 @@ public static UnifiedJedis createJedis(Properties configProps) {
return new UnifiedJedis(new HostAndPort(host, port));
}
+
+ public static Connection createConnection(Properties configProps) {
+
+ String host =
+ configProps.getProperty(
+ JedisConfigConstants.REDIS_HOST, JedisConfigConstants.DEFAULT_REDIS_HOST);
+
+ int port =
+ Integer.parseInt(
+ configProps.getProperty(
+ JedisConfigConstants.REDIS_PORT,
+ Integer.toString(JedisConfigConstants.DEFAULT_REDIS_PORT)));
+
+ return new Connection(host, port);
+ }
}
diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/RedisAsyncWriter.java b/src/main/java/org/apache/flink/connector/redis/sink2/RedisAsyncWriter.java
index 53ec1f7..1a49485 100644
--- a/src/main/java/org/apache/flink/connector/redis/sink2/RedisAsyncWriter.java
+++ b/src/main/java/org/apache/flink/connector/redis/sink2/RedisAsyncWriter.java
@@ -39,7 +39,7 @@ protected void submitRequestEntries(
request.write(jedis);
} catch (JedisDataException de) {
// not worth retrying; will fail again
- // TODO
+ // TODO ?
} catch (Exception e) {
toRetry.add(request);
}
diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/RedisSink.java b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSink.java
index 461987a..bb6f63b 100644
--- a/src/main/java/org/apache/flink/connector/redis/sink2/RedisSink.java
+++ b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSink.java
@@ -22,10 +22,11 @@ public RedisSink(Properties config, RedisSinkConfig sinkConfig, RedisSerializer<
@Override
public SinkWriter createWriter(Sink.InitContext initContext) throws IOException {
- return new RedisAsyncWriter<>(
- JedisUtils.createJedis(config),
- new RedisConverter<>(serializer),
- sinkConfig,
- initContext);
+// return new RedisAsyncWriter<>(
+// JedisUtils.createJedis(config),
+// new RedisConverter<>(serializer),
+// sinkConfig,
+// initContext);
+ return new RedisSyncWriter(JedisUtils.createConnection(config), serializer);
}
}
diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/RedisSyncWriter.java b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSyncWriter.java
index 5281c9e..0810c7f 100644
--- a/src/main/java/org/apache/flink/connector/redis/sink2/RedisSyncWriter.java
+++ b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSyncWriter.java
@@ -1,49 +1,69 @@
package org.apache.flink.connector.redis.sink2;
import org.apache.flink.api.connector.sink2.SinkWriter;
-import redis.clients.jedis.UnifiedJedis;
+import redis.clients.jedis.Connection;
+import redis.clients.jedis.MultiNodePipelineBase;
+import redis.clients.jedis.Pipeline;
import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Queue;
public class RedisSyncWriter implements SinkWriter {
- private final UnifiedJedis jedis;
+ private final Connection conn;
+ private final Pipeline pipe;
+ private final MultiNodePipelineBase mpipe;
private final RedisSerializer serializer;
- private final boolean writeImmediate;
+ public RedisSyncWriter(
+ Connection connection, RedisSerializer serializer) {
+ this.conn = connection;
+ this.pipe = new Pipeline(this.conn);
+ this.mpipe = null;
+ this.serializer = serializer;
+ }
- private final Queue queue = new ArrayDeque<>();
+ public RedisSyncWriter(
+ Pipeline pipe, RedisSerializer serializer) {
+ this.conn = null;
+ this.pipe = pipe;
+ this.mpipe = null;
+ this.serializer = serializer;
+ }
public RedisSyncWriter(
- UnifiedJedis jedis, RedisSerializer serializer, boolean writeImmediate) {
- this.jedis = jedis;
+ MultiNodePipelineBase mpipe, RedisSerializer serializer) {
+ this.conn = null;
+ this.pipe = null;
+ this.mpipe = mpipe;
this.serializer = serializer;
- this.writeImmediate = writeImmediate;
}
@Override
public void write(IN input, Context context) throws IOException, InterruptedException {
RedisWriteRequest request = serializer.serialize(input);
- if (writeImmediate) {
- request.write(jedis);
+ if (pipe != null) {
+ request.write(pipe);
} else {
- queue.add(request);
+ request.write(mpipe);
}
}
@Override
public void flush(boolean b) throws IOException, InterruptedException {
- while (!queue.isEmpty()) {
- RedisWriteRequest request = queue.remove();
- request.write(jedis);
+ if (pipe != null) {
+ pipe.sync();
+ } else {
+ mpipe.sync();
}
}
@Override
public void close() throws Exception {
- jedis.close();
+ if (conn != null) {
+ conn.close();
+ } else {
+ mpipe.close();
+ }
}
}
diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/RedisWriteRequest.java b/src/main/java/org/apache/flink/connector/redis/sink2/RedisWriteRequest.java
index df9d01e..e651043 100644
--- a/src/main/java/org/apache/flink/connector/redis/sink2/RedisWriteRequest.java
+++ b/src/main/java/org/apache/flink/connector/redis/sink2/RedisWriteRequest.java
@@ -1,11 +1,14 @@
package org.apache.flink.connector.redis.sink2;
-import redis.clients.jedis.UnifiedJedis;
+import redis.clients.jedis.commands.JedisCommands;
+import redis.clients.jedis.commands.PipelineCommands;
import java.io.Serializable;
public interface RedisWriteRequest extends Serializable {
- void write(UnifiedJedis jedis);
+ void write(JedisCommands jedis);
+
+ void write(PipelineCommands pipe);
long getSizeInBytes();
}
diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/StreamWriteRequest.java b/src/main/java/org/apache/flink/connector/redis/sink2/StreamWriteRequest.java
index 897cce2..2dda450 100644
--- a/src/main/java/org/apache/flink/connector/redis/sink2/StreamWriteRequest.java
+++ b/src/main/java/org/apache/flink/connector/redis/sink2/StreamWriteRequest.java
@@ -1,6 +1,7 @@
package org.apache.flink.connector.redis.sink2;
-import redis.clients.jedis.UnifiedJedis;
+import redis.clients.jedis.commands.JedisCommands;
+import redis.clients.jedis.commands.PipelineCommands;
import redis.clients.jedis.params.XAddParams;
import java.util.Map;
@@ -16,10 +17,15 @@ public StreamWriteRequest(String key, Map value) {
}
@Override
- public void write(UnifiedJedis jedis) {
+ public void write(JedisCommands jedis) {
jedis.xadd(key, XAddParams.xAddParams(), value);
}
+ @Override
+ public void write(PipelineCommands pipe) {
+ pipe.xadd(key, XAddParams.xAddParams(), value);
+ }
+
@Override
public long getSizeInBytes() {
long size = 9;
diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/StringWriteRequest.java b/src/main/java/org/apache/flink/connector/redis/sink2/StringWriteRequest.java
index 2db14af..2d85b98 100644
--- a/src/main/java/org/apache/flink/connector/redis/sink2/StringWriteRequest.java
+++ b/src/main/java/org/apache/flink/connector/redis/sink2/StringWriteRequest.java
@@ -1,6 +1,7 @@
package org.apache.flink.connector.redis.sink2;
-import redis.clients.jedis.UnifiedJedis;
+import redis.clients.jedis.commands.JedisCommands;
+import redis.clients.jedis.commands.PipelineCommands;
public class StringWriteRequest implements RedisWriteRequest {
@@ -13,10 +14,15 @@ public StringWriteRequest(String key, String value) {
}
@Override
- public void write(UnifiedJedis jedis) {
+ public void write(JedisCommands jedis) {
jedis.set(key, value);
}
+ @Override
+ public void write(PipelineCommands pipe) {
+ pipe.set(key, value);
+ }
+
@Override
public long getSizeInBytes() {
return 18 + key.length() + value.length();