Skip to content

Enable seeking individual topic partitions#829

Open
severinson wants to merge 4 commits intoapache:masterfrom
severinson:master
Open

Enable seeking individual topic partitions#829
severinson wants to merge 4 commits intoapache:masterfrom
severinson:master

Conversation

@severinson
Copy link

@severinson severinson commented Aug 16, 2022

Currently, seeking on a consumer with a KeyShared subscription fails. This PR removes an unnecessary check to seek the underlying partitionConsumer.

Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also help add a test to cover the new changes.

@severinson
Copy link
Author

@codelipenghui I've added tests.

Comment on lines +583 to +585
if mid.partitionIdx < 0 {
return newError(SeekFailed, "partitionIdx is negative")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this break the non-partitioned topic message acknowledgment?
The partition index -1 means the message from a non-partitioned topic.

@freeznet @wolfstudy Please help confirm

Copy link
Author

@severinson severinson Aug 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that really true? The pre-existing code would do the following:

return c.consumers[mid.partitionIdx].Seek(mid)

This suggests that the partition index is 0 for non-partitioned topics. Otherwise seek() on a non-partitioned topic would've always resulted in an index out of bounds panic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Go SDK, the partition index less than 0 is not allowed.

// did we receive a valid partition index?
	if partition < 0 || partition >= len(c.consumers) {
		c.log.Warnf("invalid partition index %d expected a partition between [0-%d]",
			partition, len(c.consumers))
		return trackingMessageID{}, false
	}

And why do we need to modify the current logic, where the number of consumers is equivalent to the number of partitions, and each partition corresponds to a unique consumer.

The Go SDK currently does not support seek operations on partitioned topics, and this logic is still being implemented.

Copy link
Author

@severinson severinson Aug 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments. I have a few questions.

And why do we need to modify the current logic, where the number of consumers is equivalent to the number of partitions, and each partition corresponds to a unique consumer.

I don't understand what you mean. I'm not changing the current logic. I'm just passing through the message id to the partitionConsumer responsible for the partition.

The Go SDK currently does not support seek operations on partitioned topics, and this logic is still being implemented.

All right. Do you know what work still remains to have the Go client support seeking by partition?

@wolfstudy wolfstudy added this to the v0.10.0 milestone Aug 22, 2022
Copy link
Member

@wolfstudy wolfstudy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few changes, please check

Comment on lines +583 to +585
if mid.partitionIdx < 0 {
return newError(SeekFailed, "partitionIdx is negative")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Go SDK, the partition index less than 0 is not allowed.

// did we receive a valid partition index?
	if partition < 0 || partition >= len(c.consumers) {
		c.log.Warnf("invalid partition index %d expected a partition between [0-%d]",
			partition, len(c.consumers))
		return trackingMessageID{}, false
	}

And why do we need to modify the current logic, where the number of consumers is equivalent to the number of partitions, and each partition corresponds to a unique consumer.

The Go SDK currently does not support seek operations on partitioned topics, and this logic is still being implemented.

@wolfstudy
Copy link
Member

If you want to directly perform the seek operation on a partitioned topic, you can refer to the behavior here. #782

@codelipenghui
Copy link
Contributor

@severinson After checking more details about this part. Before 2.8.0, the Java client only supported seek with the earliest/latest position. apache/pulsar#10033 has introduced a new API https://github.com/apache/pulsar/pull/10033/files#diff-6010f94fead60a5a3ed8aa58a37fe96fdf2cd9c5c1573989cdf89abcb9f9c256R613 which able to provide seek message ID for each partition or topic(For multi-topic/regex consumer)

I think the go client should also follow the same way.

@RobertIndie RobertIndie modified the milestones: v0.10.0, v0.11.0 Mar 27, 2023
@RobertIndie RobertIndie modified the milestones: v0.11.0, v0.12.0 Jul 4, 2023
@RobertIndie RobertIndie modified the milestones: v0.12.0, v0.13.0 Jan 10, 2024
@RobertIndie RobertIndie modified the milestones: v0.13.0, v0.14.0 Jul 15, 2024
@RobertIndie RobertIndie modified the milestones: v0.14.0, v0.15.0 Oct 8, 2024
@RobertIndie RobertIndie modified the milestones: v0.15.0, v0.16.0 May 15, 2025
@RobertIndie RobertIndie modified the milestones: v0.16.0, v0.17.0 Jul 29, 2025
@RobertIndie RobertIndie removed this from the v0.17.0 milestone Oct 23, 2025
@RobertIndie RobertIndie added this to the v0.18.0 milestone Oct 23, 2025
@RobertIndie RobertIndie modified the milestones: v0.18.0, v0.19.0 Dec 1, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants