Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rocketmq-namesrv/src/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@

pub(crate) mod batch_unregistration_service;
pub mod route_info_manager;
pub(crate) mod zone_route_rpc_hook;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Module addition looks good; ensure the hook is actually registered.

The module is declared but wiring isn’t shown. Verify namesrv runtime installs ZoneRouteRPCHook on the remoting client/server for GetRouteinfoByTopic requests.


🏁 Script executed:

#!/bin/bash
# Find hook usage/registration.
rg -n -C3 'ZoneRouteRPCHook|RPCHook' rocketmq-namesrv rocketmq-remoting --type=rust
# Grep namesrv startup/bootstrapping code.
rg -n -C3 'Namesrv|NameSrv|start|bootstrap' rocketmq-namesrv --type=rust

Length of output: 34591


Register ZoneRouteRPCHook in initiate_rpc_hooks
In rocketmq-namesrv/src/bootstrap.rs the initiate_rpc_hooks() method still only logs “RPC hooks are not supported yet.” Replace the stub with calls to register ZoneRouteRPCHook on both the remoting client and server so that GetRouteInfoByTopic requests use the hook.

🤖 Prompt for AI Agents
In rocketmq-namesrv/src/bootstrap.rs around the initiate_rpc_hooks() stub,
replace the placeholder log with actual registration of ZoneRouteRPCHook: import
the zone_route_rpc_hook module (already declared in route.rs), instantiate the
ZoneRouteRPCHook and call the remoting client and remoting server hook
registration APIs to add the hook (e.g., remoting_client.register_rpc_hook(...)
and remoting_server.register_rpc_hook(...) or the equivalent methods your
remoting types expose) so that GetRouteInfoByTopic requests are intercepted;
propagate or log any errors from registration and ensure the hook instance lives
long enough (store it or register a boxed/arc reference as required by the API).

124 changes: 124 additions & 0 deletions rocketmq-namesrv/src/route/zone_route_rpc_hook.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use cheetah_string::CheetahString;
use rocketmq_common::common::mix_all;
use rocketmq_common::common::mix_all::MASTER_ID;
use rocketmq_remoting::code::request_code::RequestCode;
use rocketmq_remoting::code::response_code::ResponseCode;
use rocketmq_remoting::protocol::route::route_data_view::BrokerData;
use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData;
use rocketmq_remoting::protocol::RemotingDeserializable;
use rocketmq_remoting::protocol::RemotingSerializable;
use rocketmq_remoting::runtime::RPCHook;

pub struct ZoneRouteRPCHook;

impl RPCHook for ZoneRouteRPCHook {
#[inline(always)]
fn do_before_request(
&self,
_remote_addr: std::net::SocketAddr,
_request: &mut rocketmq_remoting::protocol::remoting_command::RemotingCommand,
) -> rocketmq_error::RocketMQResult<()> {
Ok(())
}

fn do_after_response(
&self,
_remote_addr: std::net::SocketAddr,
request: &rocketmq_remoting::protocol::remoting_command::RemotingCommand,
response: &mut rocketmq_remoting::protocol::remoting_command::RemotingCommand,
) -> rocketmq_error::RocketMQResult<()> {
if RequestCode::GetRouteinfoByTopic as i32 != request.code() {
return Ok(());
}
if response.get_body().is_none() || ResponseCode::Success as i32 != response.code() {
return Ok(());
}
let zone_mode = if let Some(ext) = request.get_ext_fields() {
ext.get(mix_all::ZONE_MODE)
.unwrap_or(&"false".into())
.parse::<bool>()
.unwrap_or(false)
} else {
return Ok(());
};
Comment on lines +52 to +59
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Fix temporary reference in unwrap_or and make boolean parsing robust.

ext.get(...).unwrap_or(&"false".into()) borrows a temporary and will not compile (“temporary value dropped while borrowed”). Also, parse::<bool>() only accepts lowercase "true"/"false". Prefer explicit, case-insensitive checks and avoid the temporary.

-        let zone_mode = if let Some(ext) = request.get_ext_fields() {
-            ext.get(mix_all::ZONE_MODE)
-                .unwrap_or(&"false".into())
-                .parse::<bool>()
-                .unwrap_or(false)
-        } else {
-            return Ok(());
-        };
+        let ext = match request.get_ext_fields() {
+            Some(ext) => ext,
+            None => return Ok(()),
+        };
+        let zone_mode = ext
+            .get(mix_all::ZONE_MODE)
+            .map(|v| v.eq_ignore_ascii_case("true") || v == "1")
+            .unwrap_or(false);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let zone_mode = if let Some(ext) = request.get_ext_fields() {
ext.get(mix_all::ZONE_MODE)
.unwrap_or(&"false".into())
.parse::<bool>()
.unwrap_or(false)
} else {
return Ok(());
};
let ext = match request.get_ext_fields() {
Some(ext) => ext,
None => return Ok(()),
};
let zone_mode = ext
.get(mix_all::ZONE_MODE)
.map(|v| v.eq_ignore_ascii_case("true") || v == "1")
.unwrap_or(false);
🤖 Prompt for AI Agents
In rocketmq-namesrv/src/route/zone_route_rpc_hook.rs around lines 52 to 59, the
code uses ext.get(...).unwrap_or(&"false".into()) which borrows a temporary and
will not compile, and then uses parse::<bool>() which only accepts lowercase
"true"/"false". Replace that logic by first obtaining the Option reference from
request.get_ext_fields(), early-returning if None (keep the existing return
Ok(())), then fetch the entry with ext.get(mix_all::ZONE_MODE).map(|s|
s.as_str()) and compute zone_mode by comparing case-insensitively (e.g.
s.eq_ignore_ascii_case("true")) or defaulting to false when None; this avoids
borrowing a temporary and handles "True"/"FALSE" variants robustly.

if !zone_mode {
return Ok(());
}

let zone_name = request.get_ext_fields().unwrap().get(mix_all::ZONE_NAME);
Copy link

Copilot AI Sep 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line will panic if get_ext_fields() returns None. Since you already checked for ext existence on line 52, you should reuse that variable instead of calling unwrap() again.

Suggested change
let zone_name = request.get_ext_fields().unwrap().get(mix_all::ZONE_NAME);
// Use ext from above instead of calling get_ext_fields() again
let zone_name = ext.get(mix_all::ZONE_NAME);

Copilot uses AI. Check for mistakes.

if zone_name.is_none() {
return Ok(());
}
let zone_name = zone_name.unwrap();
if zone_name.is_empty() {
return Ok(());
}

let mut topic_route_data = TopicRouteData::decode(response.get_body().unwrap())?;
filter_by_zone_name(&mut topic_route_data, zone_name);
response.set_body_mut_ref(topic_route_data.encode()?);
Ok(())
Comment on lines +73 to +76
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Don’t convert a successful RPC into an error when decode/encode fails.

? will bubble errors from the hook and fail the call. Make filtering best-effort: if (de)serialization fails, skip filtering and return Ok.

-        let mut topic_route_data = TopicRouteData::decode(response.get_body().unwrap())?;
-        filter_by_zone_name(&mut topic_route_data, zone_name);
-        response.set_body_mut_ref(topic_route_data.encode()?);
+        if let Ok(mut topic_route_data) = TopicRouteData::decode(response.get_body().unwrap()) {
+            filter_by_zone_name(&mut topic_route_data, zone_name);
+            if let Ok(bytes) = topic_route_data.encode() {
+                response.set_body_mut_ref(bytes);
+            }
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let mut topic_route_data = TopicRouteData::decode(response.get_body().unwrap())?;
filter_by_zone_name(&mut topic_route_data, zone_name);
response.set_body_mut_ref(topic_route_data.encode()?);
Ok(())
if let Ok(mut topic_route_data) = TopicRouteData::decode(response.get_body().unwrap()) {
filter_by_zone_name(&mut topic_route_data, zone_name);
if let Ok(bytes) = topic_route_data.encode() {
response.set_body_mut_ref(bytes);
}
}
Ok(())

}
}

pub fn filter_by_zone_name(topic_route_data: &mut TopicRouteData, zone_name: &CheetahString) {
use std::collections::HashMap;
Copy link

Copilot AI Sep 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use statement should be placed at the top of the file with other imports rather than inside the function. Move this import to the module-level imports section.

Copilot uses AI. Check for mistakes.


let mut broker_data_reserved = Vec::new();
let mut broker_data_removed: HashMap<CheetahString, BrokerData> = HashMap::new();

for bd in topic_route_data.broker_datas.iter() {
let addrs = bd.broker_addrs();

// master down => keep (consume from slave)
let master_down = !addrs.contains_key(&MASTER_ID);

let same_zone = bd
.zone_name()
.map(|z| z.eq_ignore_ascii_case(zone_name))
.unwrap_or(false);

if master_down || same_zone {
broker_data_reserved.push(bd.clone());
} else {
broker_data_removed.insert(bd.broker_name().clone(), bd.clone());
}
}

topic_route_data.broker_datas = broker_data_reserved;

// Filter queue data
let mut queue_data_reserved = Vec::new();
for qd in topic_route_data.queue_datas.iter() {
if !broker_data_removed.contains_key(&qd.broker_name) {
queue_data_reserved.push(qd.clone());
}
}
topic_route_data.queue_datas = queue_data_reserved;

// Remove filter server entries whose broker addresses belong to removed brokers

if !topic_route_data.filter_server_table.is_empty() {
for (_, bd) in broker_data_removed.iter() {
for addr in bd.broker_addrs().values() {
topic_route_data.filter_server_table.remove(addr);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ mod tests {
let wrapper = SubscriptionGroupWrapper::new();

assert_eq!(wrapper.subscription_group_table.len(), 0);
assert!(wrapper.data_version.timestamp >= DataVersion::default().timestamp);
assert!(wrapper.data_version.timestamp <= DataVersion::default().timestamp);
}

#[test]
Expand Down
Loading