Skip to content

Conversation

mxsm
Copy link
Owner

@mxsm mxsm commented Sep 10, 2025

Which Issue(s) This PR Fixes(Closes)

Fixes #4011

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Zone-aware routing for topic route responses: when zone mode is enabled and a zone name is provided, results are filtered to brokers within the specified zone, retaining essential masters and pruning related queues and filter servers. Matching is case-insensitive. No changes when zone mode is disabled.
  • Tests

    • Adjusted a subscription-group test assertion related to data-version timestamp ordering.

@Copilot Copilot AI review requested due to automatic review settings September 10, 2025 16:15
Copy link
Contributor

coderabbitai bot commented Sep 10, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Adds a crate-visible module and a new ZoneRouteRPCHook that post-processes GetRouteinfoByTopic responses to optionally filter TopicRouteData by a requested zone, pruning brokers/queues/filter servers outside the zone while allowing brokers whose master is down to remain.

Changes

Cohort / File(s) Summary
Module registration
rocketmq-namesrv/src/route.rs
Declares pub(crate) mod zone_route_rpc_hook; to expose the new hook module within the crate.
Zone-aware route filtering hook
rocketmq-namesrv/src/route/zone_route_rpc_hook.rs
Adds ZoneRouteRPCHook implementing RPCHook with no-op pre-request and a post-response handler that, for GetRouteinfoByTopic responses when ZONE_MODE=true and ZONE_NAME present, decodes TopicRouteData, calls filter_by_zone_name, prunes broker_datas/queue_datas/filter_server_table, and re-encodes the body. Exposes pub fn filter_by_zone_name(...).
Test adjustment
rocketmq-remoting/src/protocol/body/subscription_group_wrapper.rs
Adjusts a test assertion comparing data_version.timestamp to DataVersion::default().timestamp from >= to <= (test-only change).

Sequence Diagram(s)

sequenceDiagram
    autonumber
    actor Client
    participant NS as NameServer
    participant Hook as ZoneRouteRPCHook
    note over Client,NS: Client sends GetRouteinfoByTopic with ext fields: ZONE_MODE, ZONE_NAME

    Client->>NS: RemotingCommand(GetRouteinfoByTopic)
    NS->>NS: Build TopicRouteData (unfiltered)
    NS->>Hook: do_after_response(request, response)
    alt ZONE_MODE=true AND response OK AND body present
        Hook->>Hook: Decode TopicRouteData
        Hook->>Hook: filter_by_zone_name(zoneName)
        Hook->>Hook: Remove out-of-zone brokers\nafter master-down rules, prune queues and filter_servers
        Hook->>NS: Re-encode filtered body
        note right of Hook: Case-insensitive zone match\nKeep in-zone or master-down brokers
    else No filtering
        Hook-->>NS: No-op
    end
    NS-->>Client: Response (filtered or original)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~40 minutes

Pre-merge checks (3 passed, 2 warnings)

❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Title Check ⚠️ Warning The current title includes the issue number prefix and an emoji, which adds noise and distracts from a concise summary of the change. It is longer than necessary and does not follow the guideline of a short, single-sentence description. A clearer title would omit decorative or non-essential elements and directly state the primary feature being added. Update the title to remove the issue bracket and emoji, for example: “Add ZoneRouteRPCHook for zone-aware topic route filtering.”
Out of Scope Changes Check ⚠️ Warning The PR includes a test assertion change in rocketmq-remoting/src/protocol/body/subscription_group_wrapper.rs that is unrelated to the ZoneRouteRPCHook feature and the objectives of issue #4011. This modification does not align with the scope of adding the zone-aware RPC hook. It should be isolated or deferred to maintain clarity of the feature-focused changes. Remove the subscription_group_wrapper test change from this PR or move it to a dedicated PR focused on remoting subscription group tests.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Linked Issues Check ✅ Passed The PR implements the ZoneRouteRPCHook with do_after_response logic that filters TopicRouteData by zone exactly as specified in issue #4011 and it updates rocketmq-namesrv/src/route.rs to export the new module. The filtering behavior retains brokers in the requested zone or fallback slaves and prunes queue and filter server entries, matching the linked issue’s primary objectives. All core coding requirements are satisfied. Remaining tasks such as hook registration wiring, integration tests, and documentation are noted as follow-ups and are outside the initial implementation scope.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

Poem

I hop through maps and trim the threads,
Keeping close the zone where carrots spread.
Masters gone? I leave the trail,
So hungry hops still catch the tale.
A rabbit's prune — precise, not rash,
Routes grow snug beneath my patch. 🥕


📜 Recent review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bd994a2 and 1321487.

📒 Files selected for processing (1)
  • rocketmq-remoting/src/protocol/body/subscription_group_wrapper.rs (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
rocketmq-remoting/src/protocol/body/subscription_group_wrapper.rs (1)
rocketmq-remoting/src/protocol.rs (1)
  • timestamp (380-382)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: build (macos-latest, stable)
  • GitHub Check: build (windows-latest, stable)
  • GitHub Check: test
  • GitHub Check: build (macos-latest, nightly)
  • GitHub Check: build (windows-latest, nightly)
  • GitHub Check: build
  • GitHub Check: build (ubuntu-latest, stable)
  • GitHub Check: build (ubuntu-latest, nightly)
  • GitHub Check: auto-approve
🔇 Additional comments (1)
rocketmq-remoting/src/protocol/body/subscription_group_wrapper.rs (1)

77-77: Good fix: timestamp comparison direction now matches creation order

Using <= reflects that a freshly created DataVersion::default() will typically have a later timestamp than the one created inside SubscriptionGroupWrapper::new(), avoiding time-based flakiness in the test.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature-4011

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@rocketmq-rust-bot
Copy link
Collaborator

🔊@mxsm 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds zone-aware topic route filtering functionality to the RocketMQ name server by implementing a ZoneRouteRPCHook that intercepts topic route responses and filters broker data based on zone configuration.

  • Implements ZoneRouteRPCHook struct with RPC hook functionality for zone-aware route filtering
  • Adds filtering logic that preserves brokers in the same zone or brokers with master down for failover scenarios
  • Updates the route module to include the new zone route RPC hook module

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
rocketmq-namesrv/src/route/zone_route_rpc_hook.rs New file implementing zone-aware route filtering with RPCHook trait and filter logic
rocketmq-namesrv/src/route.rs Module declaration for the new zone_route_rpc_hook module

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

return Ok(());
}

let zone_name = request.get_ext_fields().unwrap().get(mix_all::ZONE_NAME);
Copy link

Copilot AI Sep 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line will panic if get_ext_fields() returns None. Since you already checked for ext existence on line 52, you should reuse that variable instead of calling unwrap() again.

Suggested change
let zone_name = request.get_ext_fields().unwrap().get(mix_all::ZONE_NAME);
// Use ext from above instead of calling get_ext_fields() again
let zone_name = ext.get(mix_all::ZONE_NAME);

Copilot uses AI. Check for mistakes.

}

pub fn filter_by_zone_name(topic_route_data: &mut TopicRouteData, zone_name: &CheetahString) {
use std::collections::HashMap;
Copy link

Copilot AI Sep 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use statement should be placed at the top of the file with other imports rather than inside the function. Move this import to the module-level imports section.

Copilot uses AI. Check for mistakes.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (2)
rocketmq-namesrv/src/route/zone_route_rpc_hook.rs (2)

64-71: Avoid a second unwrap and validate ZONE_NAME once.

Reuse the ext map, avoid unwrap, and short-circuit on missing/empty zone name. Optionally trim whitespace.

-        let zone_name = request.get_ext_fields().unwrap().get(mix_all::ZONE_NAME);
-        if zone_name.is_none() {
-            return Ok(());
-        }
-        let zone_name = zone_name.unwrap();
-        if zone_name.is_empty() {
-            return Ok(());
-        }
+        let zone_name = match ext.get(mix_all::ZONE_NAME) {
+            Some(z) if !z.is_empty() => z,
+            _ => return Ok(()),
+        };

86-104: Reduce cloning and pass count using retain + tracked removals.

Current approach clones each BrokerData/QueueData and does multiple passes. Retain in-place and track removed names/addresses to prune queues and filter servers in O(n) without clones.

-    let mut broker_data_reserved = Vec::new();
-    let mut broker_data_removed: HashMap<CheetahString, BrokerData> = HashMap::new();
-
-    for bd in topic_route_data.broker_datas.iter() {
-        let addrs = bd.broker_addrs();
-        let master_down = !addrs.contains_key(&MASTER_ID);
-        let same_zone = bd
-            .zone_name()
-            .map(|z| z.eq_ignore_ascii_case(zone_name))
-            .unwrap_or(false);
-        if master_down || same_zone {
-            broker_data_reserved.push(bd.clone());
-        } else {
-            broker_data_removed.insert(bd.broker_name().clone(), bd.clone());
-        }
-    }
-    topic_route_data.broker_datas = broker_data_reserved;
-
-    // Filter queue data
-    let mut queue_data_reserved = Vec::new();
-    for qd in topic_route_data.queue_datas.iter() {
-        if !broker_data_removed.contains_key(&qd.broker_name) {
-            queue_data_reserved.push(qd.clone());
-        }
-    }
-    topic_route_data.queue_datas = queue_data_reserved;
-
-    // Remove filter server entries whose broker addresses belong to removed brokers
-    if !topic_route_data.filter_server_table.is_empty() {
-        for (_, bd) in broker_data_removed.iter() {
-            for addr in bd.broker_addrs().values() {
-                topic_route_data.filter_server_table.remove(addr);
-            }
-        }
-    }
+    use std::collections::{HashSet, HashMap};
+    let mut removed_names: HashSet<CheetahString> = HashSet::new();
+    let mut removed_addrs: Vec<CheetahString> = Vec::new();
+
+    topic_route_data.broker_datas.retain(|bd| {
+        let addrs = bd.broker_addrs();
+        let master_down = !addrs.contains_key(&MASTER_ID);
+        let same_zone = bd
+            .zone_name()
+            .map(|z| z.eq_ignore_ascii_case(zone_name))
+            .unwrap_or(false);
+        let keep = master_down || same_zone;
+        if !keep {
+            removed_names.insert(bd.broker_name().clone());
+            removed_addrs.extend(addrs.values().cloned());
+        }
+        keep
+    });
+
+    topic_route_data
+        .queue_datas
+        .retain(|qd| !removed_names.contains(&qd.broker_name));
+
+    for addr in removed_addrs {
+        topic_route_data.filter_server_table.remove(&addr);
+    }

Also applies to: 106-114, 117-123

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f0c1898 and bd994a2.

📒 Files selected for processing (2)
  • rocketmq-namesrv/src/route.rs (1 hunks)
  • rocketmq-namesrv/src/route/zone_route_rpc_hook.rs (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
rocketmq-namesrv/src/route/zone_route_rpc_hook.rs (2)
rocketmq-remoting/src/protocol/remoting_command.rs (1)
  • code (578-580)
rocketmq-remoting/src/protocol/route/route_data_view.rs (1)
  • zone_name (130-132)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: build
  • GitHub Check: build (ubuntu-latest, stable)
  • GitHub Check: build (windows-latest, stable)
  • GitHub Check: build (windows-latest, nightly)
  • GitHub Check: build (macos-latest, stable)
  • GitHub Check: test
  • GitHub Check: build (macos-latest, nightly)
  • GitHub Check: build (ubuntu-latest, nightly)
  • GitHub Check: auto-approve
🔇 Additional comments (1)
rocketmq-namesrv/src/route/zone_route_rpc_hook.rs (1)

75-75: Drop the set_body_mut_ref ownership check. It takes impl Into<Bytes> (e.g., consumes a Vec<u8> into a Bytes), so it owns the data just like set_body and can’t dangle.


pub(crate) mod batch_unregistration_service;
pub mod route_info_manager;
pub(crate) mod zone_route_rpc_hook;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Module addition looks good; ensure the hook is actually registered.

The module is declared but wiring isn’t shown. Verify namesrv runtime installs ZoneRouteRPCHook on the remoting client/server for GetRouteinfoByTopic requests.


🏁 Script executed:

#!/bin/bash
# Find hook usage/registration.
rg -n -C3 'ZoneRouteRPCHook|RPCHook' rocketmq-namesrv rocketmq-remoting --type=rust
# Grep namesrv startup/bootstrapping code.
rg -n -C3 'Namesrv|NameSrv|start|bootstrap' rocketmq-namesrv --type=rust

Length of output: 34591


Register ZoneRouteRPCHook in initiate_rpc_hooks
In rocketmq-namesrv/src/bootstrap.rs the initiate_rpc_hooks() method still only logs “RPC hooks are not supported yet.” Replace the stub with calls to register ZoneRouteRPCHook on both the remoting client and server so that GetRouteInfoByTopic requests use the hook.

🤖 Prompt for AI Agents
In rocketmq-namesrv/src/bootstrap.rs around the initiate_rpc_hooks() stub,
replace the placeholder log with actual registration of ZoneRouteRPCHook: import
the zone_route_rpc_hook module (already declared in route.rs), instantiate the
ZoneRouteRPCHook and call the remoting client and remoting server hook
registration APIs to add the hook (e.g., remoting_client.register_rpc_hook(...)
and remoting_server.register_rpc_hook(...) or the equivalent methods your
remoting types expose) so that GetRouteInfoByTopic requests are intercepted;
propagate or log any errors from registration and ensure the hook instance lives
long enough (store it or register a boxed/arc reference as required by the API).

Comment on lines +52 to +59
let zone_mode = if let Some(ext) = request.get_ext_fields() {
ext.get(mix_all::ZONE_MODE)
.unwrap_or(&"false".into())
.parse::<bool>()
.unwrap_or(false)
} else {
return Ok(());
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Fix temporary reference in unwrap_or and make boolean parsing robust.

ext.get(...).unwrap_or(&"false".into()) borrows a temporary and will not compile (“temporary value dropped while borrowed”). Also, parse::<bool>() only accepts lowercase "true"/"false". Prefer explicit, case-insensitive checks and avoid the temporary.

-        let zone_mode = if let Some(ext) = request.get_ext_fields() {
-            ext.get(mix_all::ZONE_MODE)
-                .unwrap_or(&"false".into())
-                .parse::<bool>()
-                .unwrap_or(false)
-        } else {
-            return Ok(());
-        };
+        let ext = match request.get_ext_fields() {
+            Some(ext) => ext,
+            None => return Ok(()),
+        };
+        let zone_mode = ext
+            .get(mix_all::ZONE_MODE)
+            .map(|v| v.eq_ignore_ascii_case("true") || v == "1")
+            .unwrap_or(false);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let zone_mode = if let Some(ext) = request.get_ext_fields() {
ext.get(mix_all::ZONE_MODE)
.unwrap_or(&"false".into())
.parse::<bool>()
.unwrap_or(false)
} else {
return Ok(());
};
let ext = match request.get_ext_fields() {
Some(ext) => ext,
None => return Ok(()),
};
let zone_mode = ext
.get(mix_all::ZONE_MODE)
.map(|v| v.eq_ignore_ascii_case("true") || v == "1")
.unwrap_or(false);
🤖 Prompt for AI Agents
In rocketmq-namesrv/src/route/zone_route_rpc_hook.rs around lines 52 to 59, the
code uses ext.get(...).unwrap_or(&"false".into()) which borrows a temporary and
will not compile, and then uses parse::<bool>() which only accepts lowercase
"true"/"false". Replace that logic by first obtaining the Option reference from
request.get_ext_fields(), early-returning if None (keep the existing return
Ok(())), then fetch the entry with ext.get(mix_all::ZONE_MODE).map(|s|
s.as_str()) and compute zone_mode by comparing case-insensitively (e.g.
s.eq_ignore_ascii_case("true")) or defaulting to false when None; this avoids
borrowing a temporary and handles "True"/"FALSE" variants robustly.

Comment on lines +73 to +76
let mut topic_route_data = TopicRouteData::decode(response.get_body().unwrap())?;
filter_by_zone_name(&mut topic_route_data, zone_name);
response.set_body_mut_ref(topic_route_data.encode()?);
Ok(())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Don’t convert a successful RPC into an error when decode/encode fails.

? will bubble errors from the hook and fail the call. Make filtering best-effort: if (de)serialization fails, skip filtering and return Ok.

-        let mut topic_route_data = TopicRouteData::decode(response.get_body().unwrap())?;
-        filter_by_zone_name(&mut topic_route_data, zone_name);
-        response.set_body_mut_ref(topic_route_data.encode()?);
+        if let Ok(mut topic_route_data) = TopicRouteData::decode(response.get_body().unwrap()) {
+            filter_by_zone_name(&mut topic_route_data, zone_name);
+            if let Ok(bytes) = topic_route_data.encode() {
+                response.set_body_mut_ref(bytes);
+            }
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let mut topic_route_data = TopicRouteData::decode(response.get_body().unwrap())?;
filter_by_zone_name(&mut topic_route_data, zone_name);
response.set_body_mut_ref(topic_route_data.encode()?);
Ok(())
if let Ok(mut topic_route_data) = TopicRouteData::decode(response.get_body().unwrap()) {
filter_by_zone_name(&mut topic_route_data, zone_name);
if let Ok(bytes) = topic_route_data.encode() {
response.set_body_mut_ref(bytes);
}
}
Ok(())

@rocketmq-rust-bot
Copy link
Collaborator

🔊@mxsm 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

Copy link
Collaborator

@rocketmq-rust-bot rocketmq-rust-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@rocketmq-rust-bot rocketmq-rust-bot merged commit be818c6 into main Sep 10, 2025
16 checks passed
@rocketmq-rust-bot rocketmq-rust-bot added approved PR has approved and removed ready to review waiting-review waiting review this PR labels Sep 10, 2025
Copy link

codecov bot commented Sep 10, 2025

Codecov Report

❌ Patch coverage is 1.40845% with 70 lines in your changes missing coverage. Please review.
✅ Project coverage is 26.62%. Comparing base (f0c1898) to head (1321487).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
rocketmq-namesrv/src/route/zone_route_rpc_hook.rs 0.00% 70 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #4012      +/-   ##
==========================================
- Coverage   26.64%   26.62%   -0.03%     
==========================================
  Files         564      565       +1     
  Lines       80510    80580      +70     
==========================================
  Hits        21453    21453              
- Misses      59057    59127      +70     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@mxsm mxsm deleted the feature-4011 branch September 11, 2025 14:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Enhancement: Add ZoneRouteRPCHook for zone-aware topic route filtering

3 participants