Skip to content

Commit 3cbf521

Browse files
committed
Change xread/xreadGroup calls
1 parent d4e3caa commit 3cbf521

File tree

4 files changed

+71
-88
lines changed

4 files changed

+71
-88
lines changed

src/main/java/org/apache/flink/streaming/connectors/redis/AbstractRedisStreamConsumer.java

Lines changed: 27 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,27 @@
1818
package org.apache.flink.streaming.connectors.redis;
1919

2020
import org.apache.flink.streaming.connectors.redis.config.StartupMode;
21-
2221
import redis.clients.jedis.Jedis;
2322
import redis.clients.jedis.StreamEntry;
2423
import redis.clients.jedis.StreamEntryID;
2524

26-
import java.util.AbstractMap.SimpleEntry;
27-
import java.util.Arrays;
28-
import java.util.HashMap;
25+
import java.util.LinkedHashMap;
2926
import java.util.List;
3027
import java.util.Map;
3128
import java.util.Map.Entry;
3229
import java.util.Properties;
30+
import java.util.stream.Collectors;
3331

34-
/** @param <T> */
32+
/**
33+
* @param <T>
34+
*/
3535
public abstract class AbstractRedisStreamConsumer<T> extends RedisConsumerBase<T> {
3636

37-
protected final Entry<String, StreamEntryID>[] streamEntryIds;
38-
private final Map<String, Integer> keyIndex = new HashMap<>();
37+
protected final Map<String, StreamEntryID> streamEntryIds;
3938

4039
public AbstractRedisStreamConsumer(
41-
StartupMode startupMode, String[] streamKeys, Properties configProps) {
42-
super(Arrays.asList(streamKeys), configProps);
40+
StartupMode startupMode, List<String> streamKeys, Properties configProps) {
41+
super(streamKeys, configProps);
4342
final StreamEntryID streamEntryID;
4443
switch (startupMode) {
4544
case EARLIEST:
@@ -60,24 +59,17 @@ public AbstractRedisStreamConsumer(
6059
throw new IllegalStateException();
6160
}
6261
this.streamEntryIds = prepareStreamEntryIds(streamKeys, streamEntryID);
63-
initializeKeyIndex();
64-
}
65-
66-
public AbstractRedisStreamConsumer(
67-
String[] streamKeys, Long[] timestamps, Properties configProps) {
68-
this(streamKeys, streamEntryIds(timestamps), configProps);
6962
}
7063

7164
public AbstractRedisStreamConsumer(
72-
String[] streamKeys, StreamEntryID[] streamIds, Properties configProps) {
65+
List<String> streamKeys, List<StreamEntryID> streamIds, Properties configProps) {
7366
this(prepareStreamEntryIds(streamKeys, streamIds), configProps);
7467
}
7568

7669
private AbstractRedisStreamConsumer(
77-
Entry<String, StreamEntryID>[] streamIds, Properties configProps) {
70+
Map<String, StreamEntryID> streamIds, Properties configProps) {
7871
super(null, configProps);
7972
this.streamEntryIds = streamIds;
80-
initializeKeyIndex();
8173
}
8274

8375
@Override
@@ -104,44 +96,32 @@ protected abstract void collect(
10496
SourceContext<T> sourceContext, String streamKey, StreamEntry streamEntry);
10597

10698
protected void updateIdForKey(String streamKey, StreamEntryID streamEntryID) {
107-
int index = keyIndex.get(streamKey);
108-
if (this.streamEntryIds[index].getValue().toString().equals(">")) {
99+
if (this.streamEntryIds.get(streamKey).toString().equals(">")) {
109100
// skip
110101
} else {
111-
this.streamEntryIds[index].setValue(streamEntryID);
102+
this.streamEntryIds.put(streamKey, streamEntryID);
112103
}
113104
}
114105

115-
private void initializeKeyIndex() {
116-
int index = 0;
117-
for (Entry<String, StreamEntryID> streamEntryId : streamEntryIds) {
118-
keyIndex.put(streamEntryId.getKey(), index++);
119-
}
106+
private static Map<String, StreamEntryID> prepareStreamEntryIds(
107+
List<String> streamKeys, StreamEntryID streamId) {
108+
Map<String, StreamEntryID> streams = new LinkedHashMap<>(streamKeys.size());
109+
streamKeys.forEach(streamKey -> streams.put(streamKey, streamId));
110+
return streams;
120111
}
121112

122-
private static Entry<String, StreamEntryID>[] prepareStreamEntryIds(
123-
String[] streamKeys, StreamEntryID streamId) {
124-
Entry<?, ?>[] streams = new Entry<?, ?>[streamKeys.length];
125-
for (int i = 0; i < streamKeys.length; i++) {
126-
streams[i] = new SimpleEntry<>(streamKeys[i], streamId);
113+
private static Map<String, StreamEntryID> prepareStreamEntryIds(
114+
List<String> streamKeys, List<StreamEntryID> streamIds) {
115+
Map<String, StreamEntryID> streams = new LinkedHashMap<>(streamKeys.size());
116+
for (int i = 0; i < streamKeys.size(); i++) {
117+
streams.put(streamKeys.get(i), streamIds.get(i));
127118
}
128-
return (Entry<String, StreamEntryID>[]) streams;
119+
return streams;
129120
}
130121

131-
private static Entry<String, StreamEntryID>[] prepareStreamEntryIds(
132-
String[] streamKeys, StreamEntryID[] streamIds) {
133-
Entry<?, ?>[] streams = new Entry<?, ?>[streamKeys.length];
134-
for (int i = 0; i < streamKeys.length; i++) {
135-
streams[i] = new SimpleEntry<>(streamKeys[i], streamIds[i]);
136-
}
137-
return (Entry<String, StreamEntryID>[]) streams;
138-
}
139-
140-
private static StreamEntryID[] streamEntryIds(Long[] timestamps) {
141-
StreamEntryID[] entryIds = new StreamEntryID[timestamps.length];
142-
for (int i = 0; i < timestamps.length; i++) {
143-
entryIds[i] = new StreamEntryID(timestamps[i], 0L);
144-
}
145-
return entryIds;
122+
public static List<StreamEntryID> convertToStreamEntryIDs(List<Long> timestamps) {
123+
return timestamps.stream()
124+
.map(ts -> new StreamEntryID(ts, 0L))
125+
.collect(Collectors.toList());
146126
}
147127
}

src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumer.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,19 @@
1818
package org.apache.flink.streaming.connectors.redis;
1919

2020
import org.apache.flink.streaming.connectors.redis.config.StartupMode;
21-
2221
import redis.clients.jedis.Jedis;
2322
import redis.clients.jedis.StreamEntry;
2423
import redis.clients.jedis.StreamEntryID;
24+
import redis.clients.jedis.params.XReadParams;
2525

26+
import java.util.Arrays;
2627
import java.util.List;
2728
import java.util.Map.Entry;
2829
import java.util.Properties;
2930

30-
/** @param <T> */
31+
/**
32+
* @param <T>
33+
*/
3134
public class RedisStreamConsumer<T> extends AbstractRedisStreamConsumer<T> {
3235

3336
private final DataConverter<T> dataConverter;
@@ -36,32 +39,32 @@ public RedisStreamConsumer(
3639
Properties configProps,
3740
StartupMode startupMode,
3841
DataConverter<T> dataConverter,
39-
String... streamKeys) {
40-
super(startupMode, streamKeys, configProps);
41-
this.dataConverter = dataConverter;
42+
String streamKey) {
43+
this(configProps, startupMode, dataConverter, Arrays.asList(streamKey));
4244
}
4345

4446
public RedisStreamConsumer(
47+
Properties configProps,
48+
StartupMode startupMode,
4549
DataConverter<T> dataConverter,
46-
String[] streamKeys,
47-
Long[] timestamps,
48-
Properties configProps) {
49-
super(streamKeys, timestamps, configProps);
50+
List<String> streamKeys) {
51+
super(startupMode, streamKeys, configProps);
5052
this.dataConverter = dataConverter;
5153
}
5254

5355
public RedisStreamConsumer(
5456
DataConverter<T> dataConverter,
55-
String[] streamKeys,
56-
StreamEntryID[] streamIds,
57+
List<String> streamKeys,
58+
List<StreamEntryID> streamIds,
5759
Properties configProps) {
5860
super(streamKeys, streamIds, configProps);
5961
this.dataConverter = dataConverter;
6062
}
6163

6264
@Override
6365
protected List<Entry<String, List<StreamEntry>>> read(Jedis jedis) {
64-
return jedis.xread(1, 0L, streamEntryIds);
66+
// return jedis.xread(XReadParams.xReadParams().count(1).block(0), streamEntryIds);
67+
return jedis.xread(XReadParams.xReadParams().count(1), streamEntryIds);
6568
}
6669

6770
@Override

src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumer.java

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,19 @@
1818
package org.apache.flink.streaming.connectors.redis;
1919

2020
import org.apache.flink.streaming.connectors.redis.config.StartupMode;
21-
2221
import redis.clients.jedis.Jedis;
2322
import redis.clients.jedis.StreamEntry;
2423
import redis.clients.jedis.StreamEntryID;
24+
import redis.clients.jedis.params.XReadGroupParams;
2525

26+
import java.util.Arrays;
2627
import java.util.List;
2728
import java.util.Map.Entry;
2829
import java.util.Properties;
2930

30-
/** @param <T> */
31+
/**
32+
* @param <T>
33+
*/
3134
public class RedisStreamGroupConsumer<T> extends AbstractRedisStreamConsumer<T> {
3235

3336
private final String group;
@@ -46,7 +49,7 @@ public RedisStreamGroupConsumer(
4649
consumerName,
4750
StartupMode.GROUP_OFFSETS,
4851
dataConverter,
49-
new String[] {streamKey},
52+
Arrays.asList(streamKey),
5053
config);
5154
}
5255

@@ -55,7 +58,7 @@ public RedisStreamGroupConsumer(
5558
String consumerName,
5659
StartupMode startupMode,
5760
DataConverter<T> dataConverter,
58-
String[] streamKeys,
61+
List<String> streamKeys,
5962
Properties config) {
6063
super(startupMode, streamKeys, config);
6164
this.group = groupName;
@@ -67,21 +70,8 @@ public RedisStreamGroupConsumer(
6770
String groupName,
6871
String consumerName,
6972
DataConverter<T> dataConverter,
70-
String[] streamKeys,
71-
Long[] timestamps,
72-
Properties config) {
73-
super(streamKeys, timestamps, config);
74-
this.group = groupName;
75-
this.consumer = consumerName;
76-
this.dataConverter = dataConverter;
77-
}
78-
79-
public RedisStreamGroupConsumer(
80-
String groupName,
81-
String consumerName,
82-
DataConverter<T> dataConverter,
83-
String[] streamKeys,
84-
StreamEntryID[] streamIds,
73+
List<String> streamKeys,
74+
List<StreamEntryID> streamIds,
8575
Properties config) {
8676
super(streamKeys, streamIds, config);
8777
this.group = groupName;
@@ -91,7 +81,12 @@ public RedisStreamGroupConsumer(
9181

9282
@Override
9383
protected List<Entry<String, List<StreamEntry>>> read(Jedis jedis) {
94-
return jedis.xreadGroup(group, consumer, 1, 0L, true, streamEntryIds);
84+
return jedis.xreadGroup(
85+
group,
86+
consumer,
87+
// XReadGroupParams.xReadGroupParams().count(1).block(0).noAck(),
88+
XReadGroupParams.xReadGroupParams().count(1).noAck(),
89+
streamEntryIds);
9590
}
9691

9792
@Override

src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSource.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
import org.apache.flink.table.connector.source.SourceFunctionProvider;
2929
import org.apache.flink.table.data.RowData;
3030
import org.apache.flink.util.Preconditions;
31-
3231
import redis.clients.jedis.StreamEntryID;
3332

33+
import java.util.Arrays;
3434
import java.util.Objects;
3535
import java.util.Optional;
3636
import java.util.Properties;
@@ -291,36 +291,41 @@ protected AbstractRedisStreamConsumer<RowData> createRedisConsumer() {
291291
groupName.get(),
292292
consumerName.get(),
293293
converter,
294-
new String[] {streamKey},
295-
new Long[] {timestamp},
294+
Arrays.asList(streamKey),
295+
RedisStreamGroupConsumer.convertToStreamEntryIDs(
296+
Arrays.asList(timestamp)),
296297
config);
297298
case SPECIFIC_OFFSETS:
298299
return new RedisStreamGroupConsumer<>(
299300
groupName.get(),
300301
consumerName.get(),
301302
converter,
302-
new String[] {streamKey},
303-
new StreamEntryID[] {streamEntryId},
303+
Arrays.asList(streamKey),
304+
Arrays.asList(streamEntryId),
304305
config);
305306
default:
306307
return new RedisStreamGroupConsumer<>(
307308
groupName.get(),
308309
consumerName.get(),
309310
startupMode,
310311
converter,
311-
new String[] {streamKey},
312+
Arrays.asList(streamKey),
312313
config);
313314
}
314315
} else {
315316
switch (startupMode) {
316317
case TIMESTAMP:
317318
return new RedisStreamConsumer<>(
318-
converter, new String[] {streamKey}, new Long[] {timestamp}, config);
319+
converter,
320+
Arrays.asList(streamKey),
321+
AbstractRedisStreamConsumer.convertToStreamEntryIDs(
322+
Arrays.asList(timestamp)),
323+
config);
319324
case SPECIFIC_OFFSETS:
320325
return new RedisStreamConsumer<>(
321326
converter,
322-
new String[] {streamKey},
323-
new StreamEntryID[] {streamEntryId},
327+
Arrays.asList(streamKey),
328+
Arrays.asList(streamEntryId),
324329
config);
325330
default:
326331
return new RedisStreamConsumer<>(config, startupMode, converter, streamKey);

0 commit comments

Comments
 (0)