-
Notifications
You must be signed in to change notification settings - Fork 12k
Description
Before Creating the Enhancement Request
- I have confirmed that this should be classified as an enhancement rather than a bug/feature.
Summary
The current implementation of the method whichTopicByConsumer is as follows:
public Set<String> whichTopicByConsumer(final String group) {
Set<String> topics = new HashSet<>();
Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
String topicAtGroup = next.getKey();
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
if (arrays.length == 2) {
if (group.equals(arrays[1])) {
topics.add(arrays[0]);
}
}
}
return topics;
}This method is used to query all topics under a specified group. However, it requires iterating through the entire offsetTable, and the advantages of the hash table are not utilized at all. The query efficiency is far too slow! As a result, a large portion of my CPU resources is being consumed by String.split.
Below is the CPU flame graph. Although it is from an older version of RocketMQ, I have reviewed the code in the latest version, and this part still hasn't been optimized.

Motivation
I need to periodically collect the offset progress of groups every minute, which involves calling the inefficient whichTopicByConsumer method. Such an inefficient query can be completely avoided. Simply querying the topics corresponding to a single group shouldn't require traversing all the data—this is unreasonable.
Describe the Solution You'd Like
I would like the whichTopicByConsumer method to be optimized to avoid the need for a full traversal of the offsetTable when querying the topics corresponding to a specific group. A more efficient approach would involve introducing a pre-built mapping (e.g., a hash table or a similar indexed data structure) that directly maps consumer groups to their associated topics. This mapping can be maintained dynamically and updated whenever changes occur in the offsetTable, ensuring consistency.
With this solution, the query logic can simply perform a direct lookup in the mapping, reducing the complexity from O(N) (full traversal) to O(1) or O(K), where K is the number of topics associated with the group. This would significantly improve query efficiency and reduce CPU usage caused by repetitive and expensive operations like String.split.
By implementing this optimization, the system would handle periodic offset progress collection much more efficiently, especially in scenarios where the number of groups and topics grows large.
Describe Alternatives You've Considered
One approach is to manually pass in the specified topic when calling the getConsumeStats method. This way, the execution of whichTopicByConsumer can be avoided. However, the inefficiency of whichTopicByConsumer still remains as an issue.
Additional Context
No response