Skip to content

Commit ac60cdf

Browse files
committed
FE: fix missing previousCursorId in case of empty pollRange (#550)
1 parent 52a7241 commit ac60cdf

File tree

4 files changed

+9
-10
lines changed

4 files changed

+9
-10
lines changed

api/src/main/java/io/kafbat/ui/emitter/AbstractEmitter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink, PolledRecords
4040
messagesProcessing.sentConsumingInfo(sink, records);
4141
}
4242

43-
// cursor is null if target partitions were fully polled (no, need to do paging)
44-
protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
45-
messagesProcessing.sendFinishEvents(sink, cursor);
43+
protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink, Cursor.Tracking cursor,
44+
boolean hasNext) {
45+
messagesProcessing.sendFinishEvents(sink, cursor, hasNext);
4646
sink.complete();
4747
}
4848
}

api/src/main/java/io/kafbat/ui/emitter/ConsumingStats.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import io.kafbat.ui.model.TopicMessageConsumingDTO;
44
import io.kafbat.ui.model.TopicMessageEventDTO;
55
import io.kafbat.ui.model.TopicMessagePageCursorDTO;
6-
import javax.annotation.Nullable;
76
import reactor.core.publisher.FluxSink;
87

98
class ConsumingStats {
@@ -28,8 +27,8 @@ void incFilterApplyError() {
2827
filterApplyErrors++;
2928
}
3029

31-
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
32-
String previousCursorId = cursor != null ? cursor.getPreviousCursorId() : null;
30+
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, Cursor.Tracking cursor, boolean hasNext) {
31+
String previousCursorId = cursor.getPreviousCursorId();
3332
sink.next(
3433
new TopicMessageEventDTO()
3534
.type(TopicMessageEventDTO.TypeEnum.DONE)
@@ -39,7 +38,7 @@ void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Track
3938
: null
4039
)
4140
.nextCursor(
42-
cursor != null
41+
hasNext
4342
? new TopicMessagePageCursorDTO().id(cursor.registerCursor())
4443
: null
4544
)

api/src/main/java/io/kafbat/ui/emitter/MessagesProcessing.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,9 @@ void sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink, PolledRecords polled
7272
}
7373
}
7474

75-
void sendFinishEvents(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
75+
void sendFinishEvents(FluxSink<TopicMessageEventDTO> sink, Cursor.Tracking cursor, boolean hasNext) {
7676
if (!sink.isCancelled()) {
77-
consumingStats.sendFinishEvent(sink, cursor);
77+
consumingStats.sendFinishEvent(sink, cursor, hasNext);
7878
}
7979
}
8080

api/src/main/java/io/kafbat/ui/emitter/RangePollingEmitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public void accept(FluxSink<TopicMessageEventDTO> sink) {
6464
if (sink.isCancelled()) {
6565
log.debug("Polling finished due to sink cancellation");
6666
}
67-
sendFinishStatsAndCompleteSink(sink, pollRange.isEmpty() ? null : cursor);
67+
sendFinishStatsAndCompleteSink(sink, cursor, !pollRange.isEmpty());
6868
log.debug("Polling finished");
6969
} catch (InterruptException kafkaInterruptException) {
7070
log.debug("Polling finished due to thread interruption");

0 commit comments

Comments
 (0)