Skip to content

Commit b32fcef

Browse files
authored
[Pull-based ingestion] Emit lag metric for pull-based ingestion poller (#17977)
* emit lag metric for pull-based ingestion poller Signed-off-by: Yupeng Fu <yupeng@uber.com> * changelog Signed-off-by: Yupeng Fu <yupeng@uber.com> * fix ut Signed-off-by: Yupeng Fu <yupeng@uber.com> --------- Signed-off-by: Yupeng Fu <yupeng@uber.com>
1 parent 9a3fc30 commit b32fcef

File tree

12 files changed

+68
-12
lines changed

12 files changed

+68
-12
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1010
- Unset discovery nodes for every transport node actions request ([#17682](https://github.com/opensearch-project/OpenSearch/pull/17682))
1111
- Implement parallel shard refresh behind cluster settings ([#17782](https://github.com/opensearch-project/OpenSearch/pull/17782))
1212
- Bump OpenSearch Core main branch to 3.0.0 ([#18039](https://github.com/opensearch-project/OpenSearch/pull/18039))
13+
- Update API of Message in index to add the timestamp for lag calculation in ingestion polling ([#17977](https://github.com/opensearch-project/OpenSearch/pull/17977/))
14+
1315

1416
### Changed
1517
- Change the default max header size from 8KB to 16KB. ([#18024](https://github.com/opensearch-project/OpenSearch/pull/18024))

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaMessage.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,18 @@
1717
public class KafkaMessage implements Message<byte[]> {
1818
private final byte[] key;
1919
private final byte[] payload;
20+
private final Long timestamp;
2021

2122
/**
2223
* Constructor
2324
* @param key the key of the message
2425
* @param payload the payload of the message
26+
* @param timestamp the timestamp of the message in milliseconds
2527
*/
26-
public KafkaMessage(@Nullable byte[] key, byte[] payload) {
28+
public KafkaMessage(@Nullable byte[] key, byte[] payload, Long timestamp) {
2729
this.key = key;
2830
this.payload = payload;
31+
this.timestamp = timestamp;
2932
}
3033

3134
/**
@@ -40,4 +43,9 @@ public byte[] getKey() {
4043
public byte[] getPayload() {
4144
return payload;
4245
}
46+
47+
@Override
48+
public Long getTimestamp() {
49+
return timestamp;
50+
}
4351
}

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ private synchronized List<ReadResult<KafkaOffset, KafkaMessage>> fetch(
235235
}
236236
lastFetchedOffset = currentOffset;
237237
KafkaOffset kafkaOffset = new KafkaOffset(currentOffset);
238-
KafkaMessage message = new KafkaMessage(messageAndOffset.key(), messageAndOffset.value());
238+
KafkaMessage message = new KafkaMessage(messageAndOffset.key(), messageAndOffset.value(), messageAndOffset.timestamp());
239239
results.add(new ReadResult<>(kafkaOffset, message));
240240
}
241241
return results;

plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaMessageTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,20 @@ public void testConstructorAndGetters() {
1616
byte[] key = { 1, 2, 3 };
1717
byte[] payload = { 4, 5, 6 };
1818

19-
KafkaMessage message = new KafkaMessage(key, payload);
19+
KafkaMessage message = new KafkaMessage(key, payload, 1000L);
2020

2121
Assert.assertArrayEquals(key, message.getKey());
2222
Assert.assertArrayEquals(payload, message.getPayload());
23+
Assert.assertEquals(1000L, message.getTimestamp().longValue());
2324
}
2425

2526
public void testConstructorWithNullKey() {
2627
byte[] payload = { 4, 5, 6 };
2728

28-
KafkaMessage message = new KafkaMessage(null, payload);
29+
KafkaMessage message = new KafkaMessage(null, payload, null);
2930

3031
assertNull(message.getKey());
3132
Assert.assertArrayEquals(payload, message.getPayload());
33+
Assert.assertNull(message.getTimestamp());
3234
}
3335
}

plugins/ingestion-kinesis/src/main/java/org/opensearch/plugin/kinesis/KinesisMessage.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,25 @@
1515
*/
1616
public class KinesisMessage implements Message<byte[]> {
1717
private final byte[] payload;
18+
private final Long timestamp;
1819

1920
/**
2021
* Constructor
2122
* @param payload the payload of the message
23+
* @param timestamp the timestamp of the message in milliseconds
2224
*/
23-
public KinesisMessage(byte[] payload) {
25+
public KinesisMessage(byte[] payload, Long timestamp) {
2426
this.payload = payload;
27+
this.timestamp = timestamp;
2528
}
2629

2730
@Override
2831
public byte[] getPayload() {
2932
return payload;
3033
}
34+
35+
@Override
36+
public Long getTimestamp() {
37+
return timestamp;
38+
}
3139
}

plugins/ingestion-kinesis/src/main/java/org/opensearch/plugin/kinesis/KinesisShardConsumer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,8 @@ private synchronized List<ReadResult<SequenceNumber, KinesisMessage>> fetch(
234234

235235
for (Record record : records) {
236236
SequenceNumber sequenceNumber1 = new SequenceNumber(record.sequenceNumber());
237-
KinesisMessage message = new KinesisMessage(record.data().asByteArray());
237+
Long timestamp = record.approximateArrivalTimestamp() != null ? record.approximateArrivalTimestamp().toEpochMilli() : null;
238+
KinesisMessage message = new KinesisMessage(record.data().asByteArray(), timestamp);
238239
results.add(new ReadResult<>(sequenceNumber1, message));
239240
}
240241

plugins/ingestion-kinesis/src/test/java/org/opensearch/plugin/kinesis/KinesisMessageTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,16 @@
1414
public class KinesisMessageTests extends OpenSearchTestCase {
1515
public void testConstructorAndGetters() {
1616
byte[] payload = { 1, 2, 3 };
17-
KinesisMessage message = new KinesisMessage(payload);
17+
KinesisMessage message = new KinesisMessage(payload, 1000L);
1818

1919
Assert.assertArrayEquals("Payload should be correctly initialized and returned", payload, message.getPayload());
20+
Assert.assertEquals(1000L, message.getTimestamp().longValue());
2021
}
2122

2223
public void testConstructorWithNullPayload() {
23-
KinesisMessage message = new KinesisMessage(null);
24+
KinesisMessage message = new KinesisMessage(null, null);
2425

2526
Assert.assertNull("Payload should be null", message.getPayload());
27+
Assert.assertNull("Timestamp should be null", message.getTimestamp());
2628
}
2729
}

server/src/main/java/org/opensearch/index/Message.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,10 @@
1616
@ExperimentalApi
1717
public interface Message<T> {
1818
T getPayload();
19+
20+
/**
21+
* Get the timestamp of the message in milliseconds
22+
* @return the timestamp of the message
23+
*/
24+
Long getTimestamp();
1925
}

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public class DefaultStreamPoller implements StreamPoller {
3939
private volatile boolean paused;
4040
private volatile IngestionErrorStrategy errorStrategy;
4141

42+
private volatile long lastPolledMessageTimestamp = 0;
43+
4244
private IngestionShardConsumer consumer;
4345

4446
private ExecutorService consumerThread;
@@ -247,7 +249,7 @@ private IngestionShardPointer processRecords(
247249
}
248250
totalPolledCount.inc();
249251
blockingQueueContainer.add(result);
250-
252+
lastPolledMessageTimestamp = result.getMessage().getTimestamp() == null ? 0 : result.getMessage().getTimestamp();
251253
logger.debug(
252254
"Put message {} with pointer {} to the blocking queue",
253255
String.valueOf(result.getMessage().getPayload()),
@@ -366,9 +368,17 @@ public PollingIngestStats getStats() {
366368
builder.setTotalPolledCount(totalPolledCount.count());
367369
builder.setTotalProcessedCount(blockingQueueContainer.getTotalProcessedCount());
368370
builder.setTotalSkippedCount(blockingQueueContainer.getTotalSkippedCount());
371+
builder.setLagInMillis(computeLag());
369372
return builder.build();
370373
}
371374

375+
/**
376+
* Returns the lag in milliseconds since the last polled message
377+
*/
378+
private long computeLag() {
379+
return System.currentTimeMillis() - lastPolledMessageTimestamp;
380+
}
381+
372382
public State getState() {
373383
return this.state;
374384
}

server/src/main/java/org/opensearch/indices/pollingingest/PollingIngestStats.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,16 @@ public PollingIngestStats(StreamInput in) throws IOException {
3737
long totalSkippedCount = in.readLong();
3838
this.messageProcessorStats = new MessageProcessorStats(totalProcessedCount, totalSkippedCount);
3939
long totalPolledCount = in.readLong();
40-
this.consumerStats = new ConsumerStats(totalPolledCount);
40+
long lagInMillis = in.readLong();
41+
this.consumerStats = new ConsumerStats(totalPolledCount, lagInMillis);
4142
}
4243

4344
@Override
4445
public void writeTo(StreamOutput out) throws IOException {
4546
out.writeLong(messageProcessorStats.totalProcessedCount);
4647
out.writeLong(messageProcessorStats.totalSkippedCount);
4748
out.writeLong(consumerStats.totalPolledCount);
49+
out.writeLong(consumerStats.lagInMillis);
4850
}
4951

5052
@Override
@@ -56,6 +58,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
5658
builder.endObject();
5759
builder.startObject("consumer_stats");
5860
builder.field("total_polled_count", consumerStats.totalPolledCount);
61+
builder.field("lag_in_millis", consumerStats.lagInMillis);
5962
builder.endObject();
6063
builder.endObject();
6164
return builder;
@@ -93,7 +96,7 @@ public record MessageProcessorStats(long totalProcessedCount, long totalSkippedC
9396
* Stats for consumer (poller)
9497
*/
9598
@ExperimentalApi
96-
public record ConsumerStats(long totalPolledCount) {
99+
public record ConsumerStats(long totalPolledCount, long lagInMillis) {
97100
}
98101

99102
/**
@@ -104,6 +107,7 @@ public static class Builder {
104107
private long totalProcessedCount;
105108
private long totalSkippedCount;
106109
private long totalPolledCount;
110+
private long lagInMillis;
107111

108112
public Builder() {}
109113

@@ -122,9 +126,14 @@ public Builder setTotalSkippedCount(long totalSkippedCount) {
122126
return this;
123127
}
124128

129+
public Builder setLagInMillis(long lagInMillis) {
130+
this.lagInMillis = lagInMillis;
131+
return this;
132+
}
133+
125134
public PollingIngestStats build() {
126135
MessageProcessorStats messageProcessorStats = new MessageProcessorStats(totalProcessedCount, totalSkippedCount);
127-
ConsumerStats consumerStats = new ConsumerStats(totalPolledCount);
136+
ConsumerStats consumerStats = new ConsumerStats(totalPolledCount, lagInMillis);
128137
return new PollingIngestStats(messageProcessorStats, consumerStats);
129138
}
130139
}

server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,11 @@ public byte[] getPayload() {
134134
return payload;
135135
}
136136

137+
@Override
138+
public Long getTimestamp() {
139+
return System.currentTimeMillis();
140+
}
141+
137142
@Override
138143
public String toString() {
139144
return new String(payload, StandardCharsets.UTF_8);

server/src/test/java/org/opensearch/indices/pollingingest/PollingIngestStatsTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ public void testToXContent() throws IOException {
3333
+ stats.getMessageProcessorStats().totalSkippedCount()
3434
+ "},\"consumer_stats\":{\"total_polled_count\":"
3535
+ stats.getConsumerStats().totalPolledCount()
36+
+ ",\"lag_in_millis\":"
37+
+ stats.getConsumerStats().lagInMillis()
3638
+ "}}}";
3739

3840
assertEquals(expected, builder.toString());
@@ -56,6 +58,7 @@ private PollingIngestStats createTestInstance() {
5658
.setTotalProcessedCount(randomNonNegativeLong())
5759
.setTotalSkippedCount(randomNonNegativeLong())
5860
.setTotalPolledCount(randomNonNegativeLong())
61+
.setLagInMillis(randomNonNegativeLong())
5962
.build();
6063
}
6164
}

0 commit comments

Comments
 (0)