Skip to content

Commit 3c5044a

Browse files
committed
First commit
Mostly brought from apache/flink#15487
1 parent faa78e4 commit 3c5044a

22 files changed

+2020
-0
lines changed

pom.xml

+138
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
6+
<modelVersion>4.0.0</modelVersion>
7+
8+
<groupId>org.apache.flink</groupId>
9+
<artifactId>flink-connector-redis</artifactId>
10+
<version>1.16-SNAPSHOT</version>
11+
<packaging>jar</packaging>
12+
<name>Flink : Connectors : Redis</name>
13+
14+
<licenses>
15+
<license>
16+
<name>The Apache Software License, Version 2.0</name>
17+
<url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
18+
<distribution>repo</distribution>
19+
</license>
20+
</licenses>
21+
22+
<properties>
23+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
24+
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
25+
26+
<flink.version>1.16-SNAPSHOT</flink.version>
27+
<flink.shaded.version>15.0</flink.shaded.version>
28+
<target.java.version>1.8</target.java.version>
29+
<slf4j.version>1.7.32</slf4j.version>
30+
<maven.compiler.source>${target.java.version}</maven.compiler.source>
31+
<maven.compiler.target>${target.java.version}</maven.compiler.target>
32+
<!-- Default scala versions, must be overwritten by build profiles, so we set something
33+
invalid here -->
34+
<scala.version>2.12.7</scala.version>
35+
<scala.binary.version>2.12</scala.binary.version>
36+
<jedis.version>3.8.0</jedis.version>
37+
<slf4j.version>1.7.32</slf4j.version>
38+
<testcontainers.version>1.16.2</testcontainers.version>
39+
<junit.version>4.13.2</junit.version>
40+
<hamcrest.version>1.3</hamcrest.version>
41+
</properties>
42+
43+
<dependencies>
44+
45+
<!-- Redis dependencies -->
46+
47+
<dependency>
48+
<groupId>redis.clients</groupId>
49+
<artifactId>jedis</artifactId>
50+
<version>${jedis.version}</version>
51+
<scope>compile</scope>
52+
<exclusions>
53+
<exclusion>
54+
<groupId>org.slf4j</groupId>
55+
<artifactId>slf4j.api</artifactId>
56+
</exclusion>
57+
</exclusions>
58+
</dependency>
59+
60+
<!-- Logging dependencies -->
61+
62+
<dependency>
63+
<groupId>org.slf4j</groupId>
64+
<artifactId>slf4j-api</artifactId>
65+
<version>${slf4j.version}</version>
66+
</dependency>
67+
68+
<!-- Flink dependencies -->
69+
70+
<dependency>
71+
<groupId>org.apache.flink</groupId>
72+
<artifactId>flink-streaming-java</artifactId>
73+
<version>${flink.version}</version>
74+
<scope>provided</scope>
75+
</dependency>
76+
77+
<!-- Table ecosystem -->
78+
79+
<!-- Projects depending on this project won't depend on flink-table-*. -->
80+
<dependency>
81+
<groupId>org.apache.flink</groupId>
82+
<artifactId>flink-table-api-java-bridge</artifactId>
83+
<version>${flink.version}</version>
84+
<scope>provided</scope>
85+
<optional>true</optional>
86+
</dependency>
87+
88+
<dependency>
89+
<groupId>org.apache.flink</groupId>
90+
<artifactId>flink-test-utils</artifactId>
91+
<version>${flink.version}</version>
92+
<scope>test</scope>
93+
<exclusions>
94+
<exclusion>
95+
<groupId>log4j</groupId>
96+
<artifactId>log4j</artifactId>
97+
</exclusion>
98+
<exclusion>
99+
<groupId>org.slf4j</groupId>
100+
<artifactId>slf4j-log4j12</artifactId>
101+
</exclusion>
102+
</exclusions>
103+
</dependency>
104+
105+
<!-- Test dependencies -->
106+
107+
<dependency>
108+
<groupId>org.testcontainers</groupId>
109+
<artifactId>testcontainers</artifactId>
110+
<version>${testcontainers.version}</version>
111+
<scope>test</scope>
112+
</dependency>
113+
114+
<dependency>
115+
<groupId>junit</groupId>
116+
<artifactId>junit</artifactId>
117+
<version>${junit.version}</version>
118+
<type>jar</type>
119+
<scope>test</scope>
120+
</dependency>
121+
122+
</dependencies>
123+
124+
<build>
125+
<plugins>
126+
<plugin>
127+
<groupId>org.apache.maven.plugins</groupId>
128+
<artifactId>maven-surefire-plugin</artifactId>
129+
<configuration>
130+
<includes>
131+
<include>%regex[.*ITCase.*]</include>
132+
</includes>
133+
</configuration>
134+
</plugin>
135+
</plugins>
136+
</build>
137+
138+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with this
4+
* work for additional information regarding copyright ownership. The ASF
5+
* licenses this file to you under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.streaming.connectors.redis;
19+
20+
import org.apache.flink.streaming.connectors.redis.config.StartupMode;
21+
22+
import redis.clients.jedis.Jedis;
23+
import redis.clients.jedis.StreamEntry;
24+
import redis.clients.jedis.StreamEntryID;
25+
26+
import java.util.AbstractMap.SimpleEntry;
27+
import java.util.Arrays;
28+
import java.util.HashMap;
29+
import java.util.List;
30+
import java.util.Map;
31+
import java.util.Map.Entry;
32+
import java.util.Properties;
33+
34+
/** @param <T> */
35+
public abstract class AbstractRedisStreamConsumer<T> extends RedisConsumerBase<T> {
36+
37+
protected final Entry<String, StreamEntryID>[] streamEntryIds;
38+
private final Map<String, Integer> keyIndex = new HashMap<>();
39+
40+
public AbstractRedisStreamConsumer(
41+
StartupMode startupMode, String[] streamKeys, Properties configProps) {
42+
super(Arrays.asList(streamKeys), configProps);
43+
final StreamEntryID streamEntryID;
44+
switch (startupMode) {
45+
case EARLIEST:
46+
streamEntryID = new StreamEntryID();
47+
break;
48+
case LATEST:
49+
streamEntryID = StreamEntryID.LAST_ENTRY;
50+
break;
51+
case GROUP_OFFSETS:
52+
streamEntryID = StreamEntryID.UNRECEIVED_ENTRY;
53+
break;
54+
case SPECIFIC_OFFSETS:
55+
throw new RuntimeException(
56+
"Use the constructor with 'StreamEntryID[] streamIds' as param");
57+
case TIMESTAMP:
58+
throw new RuntimeException("Use the constructor with 'Long[] timestamps' param");
59+
default:
60+
throw new IllegalStateException();
61+
}
62+
this.streamEntryIds = prepareStreamEntryIds(streamKeys, streamEntryID);
63+
initializeKeyIndex();
64+
}
65+
66+
public AbstractRedisStreamConsumer(
67+
String[] streamKeys, Long[] timestamps, Properties configProps) {
68+
this(streamKeys, streamEntryIds(timestamps), configProps);
69+
}
70+
71+
public AbstractRedisStreamConsumer(
72+
String[] streamKeys, StreamEntryID[] streamIds, Properties configProps) {
73+
this(prepareStreamEntryIds(streamKeys, streamIds), configProps);
74+
}
75+
76+
private AbstractRedisStreamConsumer(
77+
Entry<String, StreamEntryID>[] streamIds, Properties configProps) {
78+
super(null, configProps);
79+
this.streamEntryIds = streamIds;
80+
initializeKeyIndex();
81+
}
82+
83+
@Override
84+
protected final boolean readAndCollect(
85+
Jedis jedis, List<String> streamKeys, SourceContext<T> sourceContext) {
86+
boolean anyEntry = false;
87+
List<Entry<String, List<StreamEntry>>> response = read(jedis);
88+
if (response != null) {
89+
for (Entry<String, List<StreamEntry>> streamEntries : response) {
90+
String streamKey = streamEntries.getKey();
91+
for (StreamEntry entry : streamEntries.getValue()) {
92+
anyEntry = true;
93+
collect(sourceContext, streamKey, entry);
94+
updateIdForKey(streamKey, entry.getID());
95+
}
96+
}
97+
}
98+
return anyEntry;
99+
}
100+
101+
protected abstract List<Entry<String, List<StreamEntry>>> read(Jedis jedis);
102+
103+
protected abstract void collect(
104+
SourceContext<T> sourceContext, String streamKey, StreamEntry streamEntry);
105+
106+
protected void updateIdForKey(String streamKey, StreamEntryID streamEntryID) {
107+
int index = keyIndex.get(streamKey);
108+
if (this.streamEntryIds[index].getValue().toString().equals(">")) {
109+
// skip
110+
} else {
111+
this.streamEntryIds[index].setValue(streamEntryID);
112+
}
113+
}
114+
115+
private void initializeKeyIndex() {
116+
int index = 0;
117+
for (Entry<String, StreamEntryID> streamEntryId : streamEntryIds) {
118+
keyIndex.put(streamEntryId.getKey(), index++);
119+
}
120+
}
121+
122+
private static Entry<String, StreamEntryID>[] prepareStreamEntryIds(
123+
String[] streamKeys, StreamEntryID streamId) {
124+
Entry<?, ?>[] streams = new Entry<?, ?>[streamKeys.length];
125+
for (int i = 0; i < streamKeys.length; i++) {
126+
streams[i] = new SimpleEntry<>(streamKeys[i], streamId);
127+
}
128+
return (Entry<String, StreamEntryID>[]) streams;
129+
}
130+
131+
private static Entry<String, StreamEntryID>[] prepareStreamEntryIds(
132+
String[] streamKeys, StreamEntryID[] streamIds) {
133+
Entry<?, ?>[] streams = new Entry<?, ?>[streamKeys.length];
134+
for (int i = 0; i < streamKeys.length; i++) {
135+
streams[i] = new SimpleEntry<>(streamKeys[i], streamIds[i]);
136+
}
137+
return (Entry<String, StreamEntryID>[]) streams;
138+
}
139+
140+
private static StreamEntryID[] streamEntryIds(Long[] timestamps) {
141+
StreamEntryID[] entryIds = new StreamEntryID[timestamps.length];
142+
for (int i = 0; i < timestamps.length; i++) {
143+
entryIds[i] = new StreamEntryID(timestamps[i], 0L);
144+
}
145+
return entryIds;
146+
}
147+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with this
4+
* work for additional information regarding copyright ownership. The ASF
5+
* licenses this file to you under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.streaming.connectors.redis;
19+
20+
import java.io.Serializable;
21+
import java.util.Map;
22+
23+
/** @param <OUT> */
24+
public interface DataConverter<OUT> extends Serializable {
25+
26+
OUT toData(Map<String, String> input);
27+
}

0 commit comments

Comments
 (0)