diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..c008740 --- /dev/null +++ b/pom.xml @@ -0,0 +1,169 @@ + + + + 4.0.0 + + org.apache.flink + flink-connector-redis + 1.16-SNAPSHOT + jar + Flink : Connectors : Redis + + + + The Apache Software License, Version 2.0 + https://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + UTF-8 + UTF-8 + + + 1.15.1 + + 1.8 + ${target.java.version} + ${target.java.version} + + + + 4.2.3 + 1.7.36 + 1.17.3 + + + 5.8.2 + + + + + + + + redis.clients + jedis + ${jedis.version} + compile + + + org.slf4j + slf4j.api + + + + + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + + + + org.apache.flink + flink-connector-base + ${flink.version} + + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + + + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + provided + true + + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + + + + + + + + org.junit.jupiter + junit-jupiter-api + ${junit.jupiter.version} + test + + + org.junit.jupiter + junit-jupiter + ${junit.jupiter.version} + test + + + org.testcontainers + testcontainers + ${testcontainers.version} + test + + + org.testcontainers + junit-jupiter + ${testcontainers.version} + test + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + %regex[.*ITCase.*] + + + + + + + \ No newline at end of file diff --git a/src/main/java/org/apache/flink/connector/redis/JedisConfigConstants.java b/src/main/java/org/apache/flink/connector/redis/JedisConfigConstants.java new file mode 100644 index 0000000..19b7e3e --- /dev/null +++ b/src/main/java/org/apache/flink/connector/redis/JedisConfigConstants.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.redis; + +import redis.clients.jedis.Protocol; + +/** */ +public class JedisConfigConstants { + + public static final String REDIS_HOST = "redis.host"; + + public static final String REDIS_PORT = "redis.port"; + + public static final String DEFAULT_REDIS_HOST = Protocol.DEFAULT_HOST; + + public static final int DEFAULT_REDIS_PORT = Protocol.DEFAULT_PORT; +} diff --git a/src/main/java/org/apache/flink/connector/redis/JedisUtils.java b/src/main/java/org/apache/flink/connector/redis/JedisUtils.java new file mode 100644 index 0000000..d5c92e0 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/redis/JedisUtils.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.redis; + +import redis.clients.jedis.Connection; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.UnifiedJedis; + +import java.util.Properties; + +/** */ +public class JedisUtils { + + public static UnifiedJedis createJedis(Properties configProps) { + + String host = + configProps.getProperty( + JedisConfigConstants.REDIS_HOST, JedisConfigConstants.DEFAULT_REDIS_HOST); + + int port = + Integer.parseInt( + configProps.getProperty( + JedisConfigConstants.REDIS_PORT, + Integer.toString(JedisConfigConstants.DEFAULT_REDIS_PORT))); + + return new UnifiedJedis(new HostAndPort(host, port)); + } + + public static Connection createConnection(Properties configProps) { + + String host = + configProps.getProperty( + JedisConfigConstants.REDIS_HOST, JedisConfigConstants.DEFAULT_REDIS_HOST); + + int port = + Integer.parseInt( + configProps.getProperty( + JedisConfigConstants.REDIS_PORT, + Integer.toString(JedisConfigConstants.DEFAULT_REDIS_PORT))); + + return new Connection(host, port); + } +} diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/RedisAsyncWriter.java b/src/main/java/org/apache/flink/connector/redis/sink2/RedisAsyncWriter.java new file mode 100644 index 0000000..1a49485 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/redis/sink2/RedisAsyncWriter.java @@ -0,0 +1,54 @@ +package org.apache.flink.connector.redis.sink2; + +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import redis.clients.jedis.UnifiedJedis; +import redis.clients.jedis.exceptions.JedisDataException; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +public class RedisAsyncWriter extends AsyncSinkWriter { + + private final UnifiedJedis jedis; + + public RedisAsyncWriter( + UnifiedJedis jedis, + RedisConverter converter, + RedisSinkConfig sinkConfig, + Sink.InitContext context) { + super( + converter, + context, + sinkConfig.maxBatchSize, + sinkConfig.maxInFlightRequests, + sinkConfig.maxBufferedRequests, + sinkConfig.maxBatchSizeInBytes, + sinkConfig.maxTimeInBufferMS, + sinkConfig.maxRecordSizeInBytes); + this.jedis = jedis; + } + + @Override + protected void submitRequestEntries( + List requests, Consumer> consumer) { + List toRetry = new ArrayList<>(); + for (RedisWriteRequest request : requests) { + try { + request.write(jedis); + } catch (JedisDataException de) { + // not worth retrying; will fail again + // TODO ? + } catch (Exception e) { + toRetry.add(request); + } + } + consumer.accept(toRetry); + } + + @Override + protected long getSizeInBytes(RedisWriteRequest request) { + return request.getSizeInBytes(); + } +} diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/RedisConverter.java b/src/main/java/org/apache/flink/connector/redis/sink2/RedisConverter.java new file mode 100644 index 0000000..caf9243 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/redis/sink2/RedisConverter.java @@ -0,0 +1,18 @@ +package org.apache.flink.connector.redis.sink2; + +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.connector.base.sink.writer.ElementConverter; + +public class RedisConverter implements ElementConverter { + + private final RedisSerializer serializer; + + public RedisConverter(RedisSerializer serializer) { + this.serializer = serializer; + } + + @Override + public RedisWriteRequest apply(T input, SinkWriter.Context context) { + return serializer.serialize(input); + } +} diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/RedisSerializer.java b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSerializer.java new file mode 100644 index 0000000..d1e323e --- /dev/null +++ b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSerializer.java @@ -0,0 +1,10 @@ +package org.apache.flink.connector.redis.sink2; + +import org.apache.flink.api.common.functions.Function; + +import java.io.Serializable; + +public interface RedisSerializer extends Function, Serializable { + + RedisWriteRequest serialize(T input); +} diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/RedisSink.java b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSink.java new file mode 100644 index 0000000..bb6f63b --- /dev/null +++ b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSink.java @@ -0,0 +1,32 @@ +package org.apache.flink.connector.redis.sink2; + +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.connector.redis.JedisUtils; + +import java.io.IOException; +import java.util.Properties; + +public class RedisSink implements Sink { + + private final Properties config; + private final RedisSerializer serializer; + + private final RedisSinkConfig sinkConfig; + + public RedisSink(Properties config, RedisSinkConfig sinkConfig, RedisSerializer serializer) { + this.config = config; + this.sinkConfig = sinkConfig; + this.serializer = serializer; + } + + @Override + public SinkWriter createWriter(Sink.InitContext initContext) throws IOException { +// return new RedisAsyncWriter<>( +// JedisUtils.createJedis(config), +// new RedisConverter<>(serializer), +// sinkConfig, +// initContext); + return new RedisSyncWriter(JedisUtils.createConnection(config), serializer); + } +} diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/RedisSinkConfig.java b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSinkConfig.java new file mode 100644 index 0000000..8d3d015 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSinkConfig.java @@ -0,0 +1,81 @@ +package org.apache.flink.connector.redis.sink2; + +import java.io.Serializable; + +public class RedisSinkConfig implements Serializable { + + public final int maxBatchSize; + public final int maxInFlightRequests; + public final int maxBufferedRequests; + public final long maxBatchSizeInBytes; + public final long maxTimeInBufferMS; + public final long maxRecordSizeInBytes; + + public RedisSinkConfig( + int maxBatchSize, + int maxInFlightRequests, + int maxBufferedRequests, + long maxBatchSizeInBytes, + long maxTimeInBufferMS, + long maxRecordSizeInBytes) { + this.maxBatchSize = maxBatchSize; + this.maxInFlightRequests = maxInFlightRequests; + this.maxBufferedRequests = maxBufferedRequests; + this.maxBatchSizeInBytes = maxBatchSizeInBytes; + this.maxTimeInBufferMS = maxTimeInBufferMS; + this.maxRecordSizeInBytes = maxRecordSizeInBytes; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private int maxBatchSize = 10; + private int maxInFlightRequests = 1; + private int maxBufferedRequests = 100; + private long maxBatchSizeInBytes = 110; + private long maxTimeInBufferMS = 1_000; + private long maxRecordSizeInBytes = maxBatchSizeInBytes; + + public Builder maxBatchSize(int maxBatchSize) { + this.maxBatchSize = maxBatchSize; + return this; + } + + public Builder maxInFlightRequests(int maxInFlightRequests) { + this.maxInFlightRequests = maxInFlightRequests; + return this; + } + + public Builder maxBufferedRequests(int maxBufferedRequests) { + this.maxBufferedRequests = maxBufferedRequests; + return this; + } + + public Builder maxBatchSizeInBytes(long maxBatchSizeInBytes) { + this.maxBatchSizeInBytes = maxBatchSizeInBytes; + return this; + } + + public Builder maxTimeInBufferMS(long maxTimeInBufferMS) { + this.maxTimeInBufferMS = maxTimeInBufferMS; + return this; + } + + public Builder maxRecordSizeInBytes(long maxRecordSizeInBytes) { + this.maxRecordSizeInBytes = maxRecordSizeInBytes; + return this; + } + + public RedisSinkConfig build() { + return new RedisSinkConfig( + maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + maxBatchSizeInBytes, + maxTimeInBufferMS, + maxRecordSizeInBytes); + } + } +} diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/RedisSyncWriter.java b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSyncWriter.java new file mode 100644 index 0000000..0810c7f --- /dev/null +++ b/src/main/java/org/apache/flink/connector/redis/sink2/RedisSyncWriter.java @@ -0,0 +1,69 @@ +package org.apache.flink.connector.redis.sink2; + +import org.apache.flink.api.connector.sink2.SinkWriter; +import redis.clients.jedis.Connection; +import redis.clients.jedis.MultiNodePipelineBase; +import redis.clients.jedis.Pipeline; + +import java.io.IOException; + +public class RedisSyncWriter implements SinkWriter { + + private final Connection conn; + private final Pipeline pipe; + private final MultiNodePipelineBase mpipe; + + private final RedisSerializer serializer; + + public RedisSyncWriter( + Connection connection, RedisSerializer serializer) { + this.conn = connection; + this.pipe = new Pipeline(this.conn); + this.mpipe = null; + this.serializer = serializer; + } + + public RedisSyncWriter( + Pipeline pipe, RedisSerializer serializer) { + this.conn = null; + this.pipe = pipe; + this.mpipe = null; + this.serializer = serializer; + } + + public RedisSyncWriter( + MultiNodePipelineBase mpipe, RedisSerializer serializer) { + this.conn = null; + this.pipe = null; + this.mpipe = mpipe; + this.serializer = serializer; + } + + @Override + public void write(IN input, Context context) throws IOException, InterruptedException { + RedisWriteRequest request = serializer.serialize(input); + if (pipe != null) { + request.write(pipe); + } else { + request.write(mpipe); + } + } + + @Override + public void flush(boolean b) throws IOException, InterruptedException { + if (pipe != null) { + pipe.sync(); + } else { + mpipe.sync(); + } + } + + @Override + public void close() throws Exception { + if (conn != null) { + conn.close(); + } else { + mpipe.close(); + } + } +} diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/RedisWriteRequest.java b/src/main/java/org/apache/flink/connector/redis/sink2/RedisWriteRequest.java new file mode 100644 index 0000000..e651043 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/redis/sink2/RedisWriteRequest.java @@ -0,0 +1,14 @@ +package org.apache.flink.connector.redis.sink2; + +import redis.clients.jedis.commands.JedisCommands; +import redis.clients.jedis.commands.PipelineCommands; + +import java.io.Serializable; + +public interface RedisWriteRequest extends Serializable { + void write(JedisCommands jedis); + + void write(PipelineCommands pipe); + + long getSizeInBytes(); +} diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/StreamWriteRequest.java b/src/main/java/org/apache/flink/connector/redis/sink2/StreamWriteRequest.java new file mode 100644 index 0000000..2dda450 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/redis/sink2/StreamWriteRequest.java @@ -0,0 +1,37 @@ +package org.apache.flink.connector.redis.sink2; + +import redis.clients.jedis.commands.JedisCommands; +import redis.clients.jedis.commands.PipelineCommands; +import redis.clients.jedis.params.XAddParams; + +import java.util.Map; + +public class StreamWriteRequest implements RedisWriteRequest { + + private final String key; + private final Map value; + + public StreamWriteRequest(String key, Map value) { + this.key = key; + this.value = value; + } + + @Override + public void write(JedisCommands jedis) { + jedis.xadd(key, XAddParams.xAddParams(), value); + } + + @Override + public void write(PipelineCommands pipe) { + pipe.xadd(key, XAddParams.xAddParams(), value); + } + + @Override + public long getSizeInBytes() { + long size = 9; + for (Map.Entry entry: value.entrySet()) { + size += 10 + entry.getKey().length() + entry.getValue().length(); + } + return size; + } +} diff --git a/src/main/java/org/apache/flink/connector/redis/sink2/StringWriteRequest.java b/src/main/java/org/apache/flink/connector/redis/sink2/StringWriteRequest.java new file mode 100644 index 0000000..2d85b98 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/redis/sink2/StringWriteRequest.java @@ -0,0 +1,30 @@ +package org.apache.flink.connector.redis.sink2; + +import redis.clients.jedis.commands.JedisCommands; +import redis.clients.jedis.commands.PipelineCommands; + +public class StringWriteRequest implements RedisWriteRequest { + + private final String key; + private final String value; + + public StringWriteRequest(String key, String value) { + this.key = key; + this.value = value; + } + + @Override + public void write(JedisCommands jedis) { + jedis.set(key, value); + } + + @Override + public void write(PipelineCommands pipe) { + pipe.set(key, value); + } + + @Override + public long getSizeInBytes() { + return 18 + key.length() + value.length(); + } +} diff --git a/src/test/java/org/apache/flink/connector/redis/RedisITCaseBase.java b/src/test/java/org/apache/flink/connector/redis/RedisITCaseBase.java new file mode 100644 index 0000000..54442a7 --- /dev/null +++ b/src/test/java/org/apache/flink/connector/redis/RedisITCaseBase.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.redis; + +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.UnifiedJedis; + +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +/** */ +@Testcontainers +public abstract class RedisITCaseBase extends AbstractTestBase { + + public static final String REDIS_IMAGE = "redis"; + private static final int REDIS_PORT = 6379; + + private static final AtomicBoolean running = new AtomicBoolean(false); + + @Container + private static final GenericContainer redis = + new GenericContainer<>(DockerImageName.parse(REDIS_IMAGE)).withExposedPorts(REDIS_PORT); + + public static MiniClusterWithClientResource cluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(2) + .setNumberTaskManagers(1) + .build()); + protected UnifiedJedis jedis; + + @BeforeAll + public static void beforeAll() throws Exception { + cluster.before(); + } + + @AfterAll + public static void afterAll() { + cluster.after(); + } + + protected static synchronized void start() { + if (!running.get()) { + redis.start(); + running.set(true); + } + } + + protected static void stop() { + redis.stop(); + running.set(false); + } + + @BeforeEach + public void setUp() { + Properties config = getConfigProperties(); + try (Jedis j = + new Jedis( + config.getProperty(JedisConfigConstants.REDIS_HOST), + Integer.parseInt(config.getProperty(JedisConfigConstants.REDIS_PORT)))) { + j.flushAll(); + } + + jedis = JedisUtils.createJedis(config); + } + + @AfterEach + public void tearDown() { + if (jedis != null) { + jedis.close(); + } + } + + protected Properties getConfigProperties() { + start(); + + Properties configProps = new Properties(); + // configProps.setProperty(RedisConfigConstants.REDIS_HOST, redis.getContainerIpAddress()); + configProps.setProperty(JedisConfigConstants.REDIS_HOST, redis.getHost()); + configProps.setProperty( + JedisConfigConstants.REDIS_PORT, Integer.toString(redis.getFirstMappedPort())); + return configProps; + } +} diff --git a/src/test/java/org/apache/flink/connector/redis/sink2/RedisSinkTest.java b/src/test/java/org/apache/flink/connector/redis/sink2/RedisSinkTest.java new file mode 100644 index 0000000..581b204 --- /dev/null +++ b/src/test/java/org/apache/flink/connector/redis/sink2/RedisSinkTest.java @@ -0,0 +1,61 @@ +package org.apache.flink.connector.redis.sink2; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.connector.redis.RedisITCaseBase; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class RedisSinkTest extends RedisITCaseBase { + + @Test + public void testStringCommand() throws Exception { + RedisSerializer serializer = input -> new StringWriteRequest("key-"+input, "value-"+input); + RedisSinkConfig redisSinkConfig = RedisSinkConfig.builder().build(); + RedisSink underTest = new RedisSink<>(getConfigProperties(), redisSinkConfig, serializer); + + List source = Arrays.asList("one", "two", "three"); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + env.fromCollection(source).sinkTo(underTest); + env.execute(); + + // verify results + source.forEach(entry -> assertEquals("value-" + entry, jedis.get("key-"+entry))); + } + + @Test + public void testStreamCommand() throws Exception { + RedisSerializer> serializer = + input -> { + Map value = new HashMap<>(); + value.put(input.f1, input.f2); + return new StreamWriteRequest(input.f0, value); + }; + RedisSinkConfig redisSinkConfig = RedisSinkConfig.builder().build(); + RedisSink> underTest = + new RedisSink<>(getConfigProperties(), redisSinkConfig, serializer); + + List> source = + Arrays.asList( + Tuple3.of("one", "onekey", "onevalue"), + Tuple3.of("two", "firstkey", "firstvalue"), + Tuple3.of("two", "secontkey", "secondvalue")); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + env.fromCollection(source).sinkTo(underTest); + env.execute(); + + // verify results + assertEquals(1, jedis.xlen("one")); + assertEquals(2, jedis.xlen("two")); + } +}