Skip to content

Commit 8cf4e11

Browse files
iliaxiliax
and
iliax
authored
BE: Messages: Fix first offsets retrieval for compacted topic (#406)
Co-authored-by: iliax <ilya.kuramshin@almatech.dev>
1 parent c06385d commit 8cf4e11

File tree

2 files changed

+32
-4
lines changed

2 files changed

+32
-4
lines changed

api/src/main/java/io/kafbat/ui/emitter/OffsetsInfo.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@
66
import java.util.HashSet;
77
import java.util.Map;
88
import java.util.Set;
9+
import java.util.stream.Collectors;
910
import lombok.Getter;
1011
import lombok.extern.slf4j.Slf4j;
1112
import org.apache.commons.lang3.mutable.MutableLong;
1213
import org.apache.kafka.clients.consumer.Consumer;
1314
import org.apache.kafka.common.TopicPartition;
15+
import org.apache.kafka.common.errors.UnsupportedVersionException;
1416

1517
@Slf4j
1618
@Getter
@@ -34,7 +36,7 @@ class OffsetsInfo {
3436

3537
OffsetsInfo(Consumer<?, ?> consumer, Collection<TopicPartition> targetPartitions) {
3638
this.consumer = consumer;
37-
this.beginOffsets = consumer.beginningOffsets(targetPartitions);
39+
this.beginOffsets = firstOffsetsForPolling(consumer, targetPartitions);
3840
this.endOffsets = consumer.endOffsets(targetPartitions);
3941
endOffsets.forEach((tp, endOffset) -> {
4042
var beginningOffset = beginOffsets.get(tp);
@@ -46,6 +48,28 @@ class OffsetsInfo {
4648
});
4749
}
4850

51+
52+
private Map<TopicPartition, Long> firstOffsetsForPolling(Consumer<?, ?> consumer,
53+
Collection<TopicPartition> partitions) {
54+
try {
55+
// we try to use offsetsForTimes() to find earliest offsets, since for
56+
// some topics (like compacted) beginningOffsets() ruturning 0 offsets
57+
// even when effectively first offset can be very high
58+
var offsets = consumer.offsetsForTimes(
59+
partitions.stream().collect(Collectors.toMap(p -> p, p -> 0L))
60+
);
61+
// result of offsetsForTimes() can be null, if message version < 0.10.0
62+
if (offsets.entrySet().stream().noneMatch(e -> e.getValue() == null)) {
63+
return offsets.entrySet().stream()
64+
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
65+
}
66+
} catch (UnsupportedOperationException | UnsupportedVersionException e) {
67+
// offsetsForTimes() not supported
68+
}
69+
//falling back to beginningOffsets() if offsetsForTimes() not supported
70+
return consumer.beginningOffsets(partitions);
71+
}
72+
4973
boolean assignedPartitionsFullyPolled() {
5074
for (var tp : consumer.assignment()) {
5175
Preconditions.checkArgument(endOffsets.containsKey(tp));

api/src/main/java/io/kafbat/ui/emitter/RangePollingEmitter.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import io.kafbat.ui.model.ConsumerPosition;
44
import io.kafbat.ui.model.TopicMessageEventDTO;
55
import java.util.ArrayList;
6+
import java.util.HashSet;
67
import java.util.List;
8+
import java.util.Set;
79
import java.util.TreeMap;
810
import java.util.function.Supplier;
911
import lombok.extern.slf4j.Slf4j;
@@ -84,20 +86,22 @@ private List<ConsumerRecord<Bytes, Bytes>> poll(EnhancedConsumer consumer,
8486
range.forEach((tp, fromTo) -> consumer.seek(tp, fromTo.from));
8587

8688
List<ConsumerRecord<Bytes, Bytes>> result = new ArrayList<>();
87-
while (!sink.isCancelled() && consumer.paused().size() < range.size()) {
89+
Set<TopicPartition> paused = new HashSet<>();
90+
while (!sink.isCancelled() && paused.size() < range.size()) {
8891
var polledRecords = poll(sink, consumer);
8992
range.forEach((tp, fromTo) -> {
9093
polledRecords.records(tp).stream()
9194
.filter(r -> r.offset() < fromTo.to)
9295
.forEach(result::add);
9396

9497
//next position is out of target range -> pausing partition
95-
if (consumer.position(tp) >= fromTo.to) {
98+
if (!paused.contains(tp) && consumer.position(tp) >= fromTo.to) {
99+
paused.add(tp);
96100
consumer.pause(List.of(tp));
97101
}
98102
});
99103
}
100-
consumer.resume(consumer.paused());
104+
consumer.resume(paused);
101105
return result;
102106
}
103107
}

0 commit comments

Comments
 (0)