Skip to content

Commit e191469

Browse files
committed
feat: Use current timestamp when metronome replays or catches up (#332)
1 parent feea479 commit e191469

2 files changed

Lines changed: 21 additions & 6 deletions

File tree

connectors/datagen-connectors/src/main/java/com/datasqrl/flinkrunner/connector/datagen/metronome/MetronomeReader.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,9 @@ public void start() {
6868
* second boundary.
6969
*
7070
* <p>The emitted sequence number is derived from wall-clock epoch seconds relative to the first
71-
* observed start second. If the reader wakes up late, repeated calls emit all missing sequence
72-
* numbers with their intended event timestamps until the source catches up.
71+
* observed start second. If the reader wakes up late or recovers from a checkpoint, repeated
72+
* calls emit all missing sequence numbers with the current wall-clock timestamp until the source
73+
* catches up.
7374
*/
7475
@Override
7576
public InputStatus pollNext(ReaderOutput<RowData> output) {
@@ -93,11 +94,10 @@ public InputStatus pollNext(ReaderOutput<RowData> output) {
9394

9495
if (lastEmittedNumber < targetNumber) {
9596
long nextNumber = lastEmittedNumber + 1;
96-
long eventTimestampSec = startTimestampSec + nextNumber;
97-
long eventTimestampMillis = eventTimestampSec * 1000L;
97+
long eventTimestampMillis = currentTimestampSec * 1000L;
9898
var row =
9999
GenericRowData.of(
100-
nextNumber, TimestampData.fromInstant(Instant.ofEpochSecond(eventTimestampSec)));
100+
nextNumber, TimestampData.fromInstant(Instant.ofEpochSecond(currentTimestampSec)));
101101
lastEmittedNumber = nextNumber;
102102
output.collect(row, eventTimestampMillis);
103103

connectors/datagen-connectors/src/test/java/com/datasqrl/flinkrunner/connector/datagen/metronome/MetronomeSourceIT.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,16 +73,21 @@ ts TIMESTAMP_LTZ(3),
7373
}
7474

7575
@Test
76-
void restoresReaderProgressFromCheckpointedSplit() {
76+
void restoresReaderProgressWithCurrentTimestampFromCheckpointedSplit() {
7777
long startTimestampSec = Instant.now().getEpochSecond() - 3L;
7878
var reader = new MetronomeReader(unusedReaderContext(), 3L);
7979
var output = new CollectingReaderOutput();
8080

8181
try {
8282
reader.addSplits(List.of(new MetronomeSplit(1L, startTimestampSec)));
8383

84+
long beforePollSec = Instant.now().getEpochSecond();
8485
assertThat(reader.pollNext(output)).isEqualTo(InputStatus.MORE_AVAILABLE);
86+
long afterPollSec = Instant.now().getEpochSecond();
87+
8588
assertThat(output.numbers()).containsExactly(2L);
89+
assertThat(output.eventTimestampSecs().get(0)).isBetween(beforePollSec, afterPollSec);
90+
assertThat(output.rowTimestampSecs().get(0)).isBetween(beforePollSec, afterPollSec);
8691
assertThat(reader.snapshotState(1L))
8792
.containsExactly(new MetronomeSplit(2L, startTimestampSec));
8893
} finally {
@@ -138,6 +143,7 @@ public UserCodeClassLoader getUserCodeClassLoader() {
138143
private static final class CollectingReaderOutput implements ReaderOutput<RowData> {
139144

140145
private final List<RowData> rows = new ArrayList<>();
146+
private final List<Long> eventTimestamps = new ArrayList<>();
141147

142148
@Override
143149
public void collect(RowData record) {
@@ -147,6 +153,7 @@ public void collect(RowData record) {
147153
@Override
148154
public void collect(RowData record, long timestamp) {
149155
rows.add(record);
156+
eventTimestamps.add(timestamp);
150157
}
151158

152159
@Override
@@ -169,5 +176,13 @@ public void releaseOutputForSplit(String splitId) {}
169176
private List<Long> numbers() {
170177
return rows.stream().map(row -> row.getLong(0)).toList();
171178
}
179+
180+
private List<Long> eventTimestampSecs() {
181+
return eventTimestamps.stream().map(timestamp -> timestamp / 1000L).toList();
182+
}
183+
184+
private List<Long> rowTimestampSecs() {
185+
return rows.stream().map(row -> row.getTimestamp(1, 3).toInstant().getEpochSecond()).toList();
186+
}
172187
}
173188
}

0 commit comments

Comments
 (0)