-
Notifications
You must be signed in to change notification settings - Fork 174
[ISSUE #4011]🔥Add ZoneRouteRPCHook for zone-aware topic route filtering #4012
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds a crate-visible module and a new ZoneRouteRPCHook that post-processes GetRouteinfoByTopic responses to optionally filter TopicRouteData by a requested zone, pruning brokers/queues/filter servers outside the zone while allowing brokers whose master is down to remain. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Client
participant NS as NameServer
participant Hook as ZoneRouteRPCHook
note over Client,NS: Client sends GetRouteinfoByTopic with ext fields: ZONE_MODE, ZONE_NAME
Client->>NS: RemotingCommand(GetRouteinfoByTopic)
NS->>NS: Build TopicRouteData (unfiltered)
NS->>Hook: do_after_response(request, response)
alt ZONE_MODE=true AND response OK AND body present
Hook->>Hook: Decode TopicRouteData
Hook->>Hook: filter_by_zone_name(zoneName)
Hook->>Hook: Remove out-of-zone brokers\nafter master-down rules, prune queues and filter_servers
Hook->>NS: Re-encode filtered body
note right of Hook: Case-insensitive zone match\nKeep in-zone or master-down brokers
else No filtering
Hook-->>NS: No-op
end
NS-->>Client: Response (filtered or original)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~40 minutes Pre-merge checks (3 passed, 2 warnings)❌ Failed checks (2 warnings)
✅ Passed checks (3 passed)
Poem
📜 Recent review detailsConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🧰 Additional context used🧬 Code graph analysis (1)rocketmq-remoting/src/protocol/body/subscription_group_wrapper.rs (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
🔇 Additional comments (1)
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds zone-aware topic route filtering functionality to the RocketMQ name server by implementing a ZoneRouteRPCHook
that intercepts topic route responses and filters broker data based on zone configuration.
- Implements
ZoneRouteRPCHook
struct with RPC hook functionality for zone-aware route filtering - Adds filtering logic that preserves brokers in the same zone or brokers with master down for failover scenarios
- Updates the route module to include the new zone route RPC hook module
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
File | Description |
---|---|
rocketmq-namesrv/src/route/zone_route_rpc_hook.rs | New file implementing zone-aware route filtering with RPCHook trait and filter logic |
rocketmq-namesrv/src/route.rs | Module declaration for the new zone_route_rpc_hook module |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
return Ok(()); | ||
} | ||
|
||
let zone_name = request.get_ext_fields().unwrap().get(mix_all::ZONE_NAME); |
Copilot
AI
Sep 10, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line will panic if get_ext_fields()
returns None
. Since you already checked for ext
existence on line 52, you should reuse that variable instead of calling unwrap()
again.
let zone_name = request.get_ext_fields().unwrap().get(mix_all::ZONE_NAME); | |
// Use ext from above instead of calling get_ext_fields() again | |
let zone_name = ext.get(mix_all::ZONE_NAME); |
Copilot uses AI. Check for mistakes.
} | ||
|
||
pub fn filter_by_zone_name(topic_route_data: &mut TopicRouteData, zone_name: &CheetahString) { | ||
use std::collections::HashMap; |
Copilot
AI
Sep 10, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use
statement should be placed at the top of the file with other imports rather than inside the function. Move this import to the module-level imports section.
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (2)
rocketmq-namesrv/src/route/zone_route_rpc_hook.rs (2)
64-71
: Avoid a secondunwrap
and validateZONE_NAME
once.Reuse the
ext
map, avoidunwrap
, and short-circuit on missing/empty zone name. Optionally trim whitespace.- let zone_name = request.get_ext_fields().unwrap().get(mix_all::ZONE_NAME); - if zone_name.is_none() { - return Ok(()); - } - let zone_name = zone_name.unwrap(); - if zone_name.is_empty() { - return Ok(()); - } + let zone_name = match ext.get(mix_all::ZONE_NAME) { + Some(z) if !z.is_empty() => z, + _ => return Ok(()), + };
86-104
: Reduce cloning and pass count usingretain
+ tracked removals.Current approach clones each BrokerData/QueueData and does multiple passes. Retain in-place and track removed names/addresses to prune queues and filter servers in O(n) without clones.
- let mut broker_data_reserved = Vec::new(); - let mut broker_data_removed: HashMap<CheetahString, BrokerData> = HashMap::new(); - - for bd in topic_route_data.broker_datas.iter() { - let addrs = bd.broker_addrs(); - let master_down = !addrs.contains_key(&MASTER_ID); - let same_zone = bd - .zone_name() - .map(|z| z.eq_ignore_ascii_case(zone_name)) - .unwrap_or(false); - if master_down || same_zone { - broker_data_reserved.push(bd.clone()); - } else { - broker_data_removed.insert(bd.broker_name().clone(), bd.clone()); - } - } - topic_route_data.broker_datas = broker_data_reserved; - - // Filter queue data - let mut queue_data_reserved = Vec::new(); - for qd in topic_route_data.queue_datas.iter() { - if !broker_data_removed.contains_key(&qd.broker_name) { - queue_data_reserved.push(qd.clone()); - } - } - topic_route_data.queue_datas = queue_data_reserved; - - // Remove filter server entries whose broker addresses belong to removed brokers - if !topic_route_data.filter_server_table.is_empty() { - for (_, bd) in broker_data_removed.iter() { - for addr in bd.broker_addrs().values() { - topic_route_data.filter_server_table.remove(addr); - } - } - } + use std::collections::{HashSet, HashMap}; + let mut removed_names: HashSet<CheetahString> = HashSet::new(); + let mut removed_addrs: Vec<CheetahString> = Vec::new(); + + topic_route_data.broker_datas.retain(|bd| { + let addrs = bd.broker_addrs(); + let master_down = !addrs.contains_key(&MASTER_ID); + let same_zone = bd + .zone_name() + .map(|z| z.eq_ignore_ascii_case(zone_name)) + .unwrap_or(false); + let keep = master_down || same_zone; + if !keep { + removed_names.insert(bd.broker_name().clone()); + removed_addrs.extend(addrs.values().cloned()); + } + keep + }); + + topic_route_data + .queue_datas + .retain(|qd| !removed_names.contains(&qd.broker_name)); + + for addr in removed_addrs { + topic_route_data.filter_server_table.remove(&addr); + }Also applies to: 106-114, 117-123
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
rocketmq-namesrv/src/route.rs
(1 hunks)rocketmq-namesrv/src/route/zone_route_rpc_hook.rs
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
rocketmq-namesrv/src/route/zone_route_rpc_hook.rs (2)
rocketmq-remoting/src/protocol/remoting_command.rs (1)
code
(578-580)rocketmq-remoting/src/protocol/route/route_data_view.rs (1)
zone_name
(130-132)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: build
- GitHub Check: build (ubuntu-latest, stable)
- GitHub Check: build (windows-latest, stable)
- GitHub Check: build (windows-latest, nightly)
- GitHub Check: build (macos-latest, stable)
- GitHub Check: test
- GitHub Check: build (macos-latest, nightly)
- GitHub Check: build (ubuntu-latest, nightly)
- GitHub Check: auto-approve
🔇 Additional comments (1)
rocketmq-namesrv/src/route/zone_route_rpc_hook.rs (1)
75-75
: Drop theset_body_mut_ref
ownership check. It takesimpl Into<Bytes>
(e.g., consumes aVec<u8>
into aBytes
), so it owns the data just likeset_body
and can’t dangle.
|
||
pub(crate) mod batch_unregistration_service; | ||
pub mod route_info_manager; | ||
pub(crate) mod zone_route_rpc_hook; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Module addition looks good; ensure the hook is actually registered.
The module is declared but wiring isn’t shown. Verify namesrv runtime installs ZoneRouteRPCHook
on the remoting client/server for GetRouteinfoByTopic requests.
🏁 Script executed:
#!/bin/bash
# Find hook usage/registration.
rg -n -C3 'ZoneRouteRPCHook|RPCHook' rocketmq-namesrv rocketmq-remoting --type=rust
# Grep namesrv startup/bootstrapping code.
rg -n -C3 'Namesrv|NameSrv|start|bootstrap' rocketmq-namesrv --type=rust
Length of output: 34591
Register ZoneRouteRPCHook in initiate_rpc_hooks
In rocketmq-namesrv/src/bootstrap.rs the initiate_rpc_hooks() method still only logs “RPC hooks are not supported yet.” Replace the stub with calls to register ZoneRouteRPCHook on both the remoting client and server so that GetRouteInfoByTopic requests use the hook.
🤖 Prompt for AI Agents
In rocketmq-namesrv/src/bootstrap.rs around the initiate_rpc_hooks() stub,
replace the placeholder log with actual registration of ZoneRouteRPCHook: import
the zone_route_rpc_hook module (already declared in route.rs), instantiate the
ZoneRouteRPCHook and call the remoting client and remoting server hook
registration APIs to add the hook (e.g., remoting_client.register_rpc_hook(...)
and remoting_server.register_rpc_hook(...) or the equivalent methods your
remoting types expose) so that GetRouteInfoByTopic requests are intercepted;
propagate or log any errors from registration and ensure the hook instance lives
long enough (store it or register a boxed/arc reference as required by the API).
let zone_mode = if let Some(ext) = request.get_ext_fields() { | ||
ext.get(mix_all::ZONE_MODE) | ||
.unwrap_or(&"false".into()) | ||
.parse::<bool>() | ||
.unwrap_or(false) | ||
} else { | ||
return Ok(()); | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Fix temporary reference in unwrap_or
and make boolean parsing robust.
ext.get(...).unwrap_or(&"false".into())
borrows a temporary and will not compile (“temporary value dropped while borrowed”). Also, parse::<bool>()
only accepts lowercase "true"/"false". Prefer explicit, case-insensitive checks and avoid the temporary.
- let zone_mode = if let Some(ext) = request.get_ext_fields() {
- ext.get(mix_all::ZONE_MODE)
- .unwrap_or(&"false".into())
- .parse::<bool>()
- .unwrap_or(false)
- } else {
- return Ok(());
- };
+ let ext = match request.get_ext_fields() {
+ Some(ext) => ext,
+ None => return Ok(()),
+ };
+ let zone_mode = ext
+ .get(mix_all::ZONE_MODE)
+ .map(|v| v.eq_ignore_ascii_case("true") || v == "1")
+ .unwrap_or(false);
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let zone_mode = if let Some(ext) = request.get_ext_fields() { | |
ext.get(mix_all::ZONE_MODE) | |
.unwrap_or(&"false".into()) | |
.parse::<bool>() | |
.unwrap_or(false) | |
} else { | |
return Ok(()); | |
}; | |
let ext = match request.get_ext_fields() { | |
Some(ext) => ext, | |
None => return Ok(()), | |
}; | |
let zone_mode = ext | |
.get(mix_all::ZONE_MODE) | |
.map(|v| v.eq_ignore_ascii_case("true") || v == "1") | |
.unwrap_or(false); |
🤖 Prompt for AI Agents
In rocketmq-namesrv/src/route/zone_route_rpc_hook.rs around lines 52 to 59, the
code uses ext.get(...).unwrap_or(&"false".into()) which borrows a temporary and
will not compile, and then uses parse::<bool>() which only accepts lowercase
"true"/"false". Replace that logic by first obtaining the Option reference from
request.get_ext_fields(), early-returning if None (keep the existing return
Ok(())), then fetch the entry with ext.get(mix_all::ZONE_MODE).map(|s|
s.as_str()) and compute zone_mode by comparing case-insensitively (e.g.
s.eq_ignore_ascii_case("true")) or defaulting to false when None; this avoids
borrowing a temporary and handles "True"/"FALSE" variants robustly.
let mut topic_route_data = TopicRouteData::decode(response.get_body().unwrap())?; | ||
filter_by_zone_name(&mut topic_route_data, zone_name); | ||
response.set_body_mut_ref(topic_route_data.encode()?); | ||
Ok(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Don’t convert a successful RPC into an error when decode/encode fails.
?
will bubble errors from the hook and fail the call. Make filtering best-effort: if (de)serialization fails, skip filtering and return Ok.
- let mut topic_route_data = TopicRouteData::decode(response.get_body().unwrap())?;
- filter_by_zone_name(&mut topic_route_data, zone_name);
- response.set_body_mut_ref(topic_route_data.encode()?);
+ if let Ok(mut topic_route_data) = TopicRouteData::decode(response.get_body().unwrap()) {
+ filter_by_zone_name(&mut topic_route_data, zone_name);
+ if let Ok(bytes) = topic_route_data.encode() {
+ response.set_body_mut_ref(bytes);
+ }
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let mut topic_route_data = TopicRouteData::decode(response.get_body().unwrap())?; | |
filter_by_zone_name(&mut topic_route_data, zone_name); | |
response.set_body_mut_ref(topic_route_data.encode()?); | |
Ok(()) | |
if let Ok(mut topic_route_data) = TopicRouteData::decode(response.get_body().unwrap()) { | |
filter_by_zone_name(&mut topic_route_data, zone_name); | |
if let Ok(bytes) = topic_route_data.encode() { | |
response.set_body_mut_ref(bytes); | |
} | |
} | |
Ok(()) |
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4012 +/- ##
==========================================
- Coverage 26.64% 26.62% -0.03%
==========================================
Files 564 565 +1
Lines 80510 80580 +70
==========================================
Hits 21453 21453
- Misses 59057 59127 +70 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Which Issue(s) This PR Fixes(Closes)
Fixes #4011
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
Tests