Enable seeking individual topic partitions#829
Enable seeking individual topic partitions#829severinson wants to merge 4 commits intoapache:masterfrom
Conversation
codelipenghui
left a comment
There was a problem hiding this comment.
Please also help add a test to cover the new changes.
|
@codelipenghui I've added tests. |
| if mid.partitionIdx < 0 { | ||
| return newError(SeekFailed, "partitionIdx is negative") | ||
| } |
There was a problem hiding this comment.
Will this break the non-partitioned topic message acknowledgment?
The partition index -1 means the message from a non-partitioned topic.
@freeznet @wolfstudy Please help confirm
There was a problem hiding this comment.
Is that really true? The pre-existing code would do the following:
return c.consumers[mid.partitionIdx].Seek(mid)
This suggests that the partition index is 0 for non-partitioned topics. Otherwise seek() on a non-partitioned topic would've always resulted in an index out of bounds panic.
There was a problem hiding this comment.
In Go SDK, the partition index less than 0 is not allowed.
// did we receive a valid partition index?
if partition < 0 || partition >= len(c.consumers) {
c.log.Warnf("invalid partition index %d expected a partition between [0-%d]",
partition, len(c.consumers))
return trackingMessageID{}, false
}
And why do we need to modify the current logic, where the number of consumers is equivalent to the number of partitions, and each partition corresponds to a unique consumer.
The Go SDK currently does not support seek operations on partitioned topics, and this logic is still being implemented.
There was a problem hiding this comment.
Thanks for the comments. I have a few questions.
And why do we need to modify the current logic, where the number of consumers is equivalent to the number of partitions, and each partition corresponds to a unique consumer.
I don't understand what you mean. I'm not changing the current logic. I'm just passing through the message id to the partitionConsumer responsible for the partition.
The Go SDK currently does not support seek operations on partitioned topics, and this logic is still being implemented.
All right. Do you know what work still remains to have the Go client support seeking by partition?
wolfstudy
left a comment
There was a problem hiding this comment.
Just a few changes, please check
| if mid.partitionIdx < 0 { | ||
| return newError(SeekFailed, "partitionIdx is negative") | ||
| } |
There was a problem hiding this comment.
In Go SDK, the partition index less than 0 is not allowed.
// did we receive a valid partition index?
if partition < 0 || partition >= len(c.consumers) {
c.log.Warnf("invalid partition index %d expected a partition between [0-%d]",
partition, len(c.consumers))
return trackingMessageID{}, false
}
And why do we need to modify the current logic, where the number of consumers is equivalent to the number of partitions, and each partition corresponds to a unique consumer.
The Go SDK currently does not support seek operations on partitioned topics, and this logic is still being implemented.
|
If you want to directly perform the seek operation on a partitioned topic, you can refer to the behavior here. #782 |
|
@severinson After checking more details about this part. Before 2.8.0, the Java client only supported seek with the earliest/latest position. apache/pulsar#10033 has introduced a new API https://github.com/apache/pulsar/pull/10033/files#diff-6010f94fead60a5a3ed8aa58a37fe96fdf2cd9c5c1573989cdf89abcb9f9c256R613 which able to provide seek message ID for each partition or topic(For multi-topic/regex consumer) I think the go client should also follow the same way. |
Currently, seeking on a consumer with a KeyShared subscription fails. This PR removes an unnecessary check to seek the underlying partitionConsumer.