-
Notifications
You must be signed in to change notification settings - Fork 174
[ISSUE #4011]🔥Add ZoneRouteRPCHook for zone-aware topic route filtering #4012
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,3 +17,4 @@ | |
|
||
pub(crate) mod batch_unregistration_service; | ||
pub mod route_info_manager; | ||
pub(crate) mod zone_route_rpc_hook; | ||
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,124 @@ | ||||||||||||||||||||||||||||||||||
/* | ||||||||||||||||||||||||||||||||||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||||||||||||||||||||||||||||||||||
* contributor license agreements. See the NOTICE file distributed with | ||||||||||||||||||||||||||||||||||
* this work for additional information regarding copyright ownership. | ||||||||||||||||||||||||||||||||||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||||||||||||||||||||||||||||||||||
* (the "License"); you may not use this file except in compliance with | ||||||||||||||||||||||||||||||||||
* the License. You may obtain a copy of the License at | ||||||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||||||
* http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||||||
* Unless required by applicable law or agreed to in writing, software | ||||||||||||||||||||||||||||||||||
* distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||||||||||||||||||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||||||||||||||||||||||
* See the License for the specific language governing permissions and | ||||||||||||||||||||||||||||||||||
* limitations under the License. | ||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||
use cheetah_string::CheetahString; | ||||||||||||||||||||||||||||||||||
use rocketmq_common::common::mix_all; | ||||||||||||||||||||||||||||||||||
use rocketmq_common::common::mix_all::MASTER_ID; | ||||||||||||||||||||||||||||||||||
use rocketmq_remoting::code::request_code::RequestCode; | ||||||||||||||||||||||||||||||||||
use rocketmq_remoting::code::response_code::ResponseCode; | ||||||||||||||||||||||||||||||||||
use rocketmq_remoting::protocol::route::route_data_view::BrokerData; | ||||||||||||||||||||||||||||||||||
use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData; | ||||||||||||||||||||||||||||||||||
use rocketmq_remoting::protocol::RemotingDeserializable; | ||||||||||||||||||||||||||||||||||
use rocketmq_remoting::protocol::RemotingSerializable; | ||||||||||||||||||||||||||||||||||
use rocketmq_remoting::runtime::RPCHook; | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
pub struct ZoneRouteRPCHook; | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
impl RPCHook for ZoneRouteRPCHook { | ||||||||||||||||||||||||||||||||||
#[inline(always)] | ||||||||||||||||||||||||||||||||||
fn do_before_request( | ||||||||||||||||||||||||||||||||||
&self, | ||||||||||||||||||||||||||||||||||
_remote_addr: std::net::SocketAddr, | ||||||||||||||||||||||||||||||||||
_request: &mut rocketmq_remoting::protocol::remoting_command::RemotingCommand, | ||||||||||||||||||||||||||||||||||
) -> rocketmq_error::RocketMQResult<()> { | ||||||||||||||||||||||||||||||||||
Ok(()) | ||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
fn do_after_response( | ||||||||||||||||||||||||||||||||||
&self, | ||||||||||||||||||||||||||||||||||
_remote_addr: std::net::SocketAddr, | ||||||||||||||||||||||||||||||||||
request: &rocketmq_remoting::protocol::remoting_command::RemotingCommand, | ||||||||||||||||||||||||||||||||||
response: &mut rocketmq_remoting::protocol::remoting_command::RemotingCommand, | ||||||||||||||||||||||||||||||||||
) -> rocketmq_error::RocketMQResult<()> { | ||||||||||||||||||||||||||||||||||
if RequestCode::GetRouteinfoByTopic as i32 != request.code() { | ||||||||||||||||||||||||||||||||||
return Ok(()); | ||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
if response.get_body().is_none() || ResponseCode::Success as i32 != response.code() { | ||||||||||||||||||||||||||||||||||
return Ok(()); | ||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
let zone_mode = if let Some(ext) = request.get_ext_fields() { | ||||||||||||||||||||||||||||||||||
ext.get(mix_all::ZONE_MODE) | ||||||||||||||||||||||||||||||||||
.unwrap_or(&"false".into()) | ||||||||||||||||||||||||||||||||||
.parse::<bool>() | ||||||||||||||||||||||||||||||||||
.unwrap_or(false) | ||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||
return Ok(()); | ||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||
Comment on lines
+52
to
+59
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Fix temporary reference in
- let zone_mode = if let Some(ext) = request.get_ext_fields() {
- ext.get(mix_all::ZONE_MODE)
- .unwrap_or(&"false".into())
- .parse::<bool>()
- .unwrap_or(false)
- } else {
- return Ok(());
- };
+ let ext = match request.get_ext_fields() {
+ Some(ext) => ext,
+ None => return Ok(()),
+ };
+ let zone_mode = ext
+ .get(mix_all::ZONE_MODE)
+ .map(|v| v.eq_ignore_ascii_case("true") || v == "1")
+ .unwrap_or(false); 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents
|
||||||||||||||||||||||||||||||||||
if !zone_mode { | ||||||||||||||||||||||||||||||||||
return Ok(()); | ||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
let zone_name = request.get_ext_fields().unwrap().get(mix_all::ZONE_NAME); | ||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line will panic if
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||||||||||||||||||||||||||||||
if zone_name.is_none() { | ||||||||||||||||||||||||||||||||||
return Ok(()); | ||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
let zone_name = zone_name.unwrap(); | ||||||||||||||||||||||||||||||||||
if zone_name.is_empty() { | ||||||||||||||||||||||||||||||||||
return Ok(()); | ||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
let mut topic_route_data = TopicRouteData::decode(response.get_body().unwrap())?; | ||||||||||||||||||||||||||||||||||
filter_by_zone_name(&mut topic_route_data, zone_name); | ||||||||||||||||||||||||||||||||||
response.set_body_mut_ref(topic_route_data.encode()?); | ||||||||||||||||||||||||||||||||||
Ok(()) | ||||||||||||||||||||||||||||||||||
Comment on lines
+73
to
+76
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Don’t convert a successful RPC into an error when decode/encode fails.
- let mut topic_route_data = TopicRouteData::decode(response.get_body().unwrap())?;
- filter_by_zone_name(&mut topic_route_data, zone_name);
- response.set_body_mut_ref(topic_route_data.encode()?);
+ if let Ok(mut topic_route_data) = TopicRouteData::decode(response.get_body().unwrap()) {
+ filter_by_zone_name(&mut topic_route_data, zone_name);
+ if let Ok(bytes) = topic_route_data.encode() {
+ response.set_body_mut_ref(bytes);
+ }
+ } 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
pub fn filter_by_zone_name(topic_route_data: &mut TopicRouteData, zone_name: &CheetahString) { | ||||||||||||||||||||||||||||||||||
use std::collections::HashMap; | ||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
let mut broker_data_reserved = Vec::new(); | ||||||||||||||||||||||||||||||||||
let mut broker_data_removed: HashMap<CheetahString, BrokerData> = HashMap::new(); | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
for bd in topic_route_data.broker_datas.iter() { | ||||||||||||||||||||||||||||||||||
let addrs = bd.broker_addrs(); | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
// master down => keep (consume from slave) | ||||||||||||||||||||||||||||||||||
let master_down = !addrs.contains_key(&MASTER_ID); | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
let same_zone = bd | ||||||||||||||||||||||||||||||||||
.zone_name() | ||||||||||||||||||||||||||||||||||
.map(|z| z.eq_ignore_ascii_case(zone_name)) | ||||||||||||||||||||||||||||||||||
.unwrap_or(false); | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
if master_down || same_zone { | ||||||||||||||||||||||||||||||||||
broker_data_reserved.push(bd.clone()); | ||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||
broker_data_removed.insert(bd.broker_name().clone(), bd.clone()); | ||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
topic_route_data.broker_datas = broker_data_reserved; | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
// Filter queue data | ||||||||||||||||||||||||||||||||||
let mut queue_data_reserved = Vec::new(); | ||||||||||||||||||||||||||||||||||
for qd in topic_route_data.queue_datas.iter() { | ||||||||||||||||||||||||||||||||||
if !broker_data_removed.contains_key(&qd.broker_name) { | ||||||||||||||||||||||||||||||||||
queue_data_reserved.push(qd.clone()); | ||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
topic_route_data.queue_datas = queue_data_reserved; | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
// Remove filter server entries whose broker addresses belong to removed brokers | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
if !topic_route_data.filter_server_table.is_empty() { | ||||||||||||||||||||||||||||||||||
for (_, bd) in broker_data_removed.iter() { | ||||||||||||||||||||||||||||||||||
for addr in bd.broker_addrs().values() { | ||||||||||||||||||||||||||||||||||
topic_route_data.filter_server_table.remove(addr); | ||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Module addition looks good; ensure the hook is actually registered.
The module is declared but wiring isn’t shown. Verify namesrv runtime installs
ZoneRouteRPCHook
on the remoting client/server for GetRouteinfoByTopic requests.🏁 Script executed:
Length of output: 34591
Register ZoneRouteRPCHook in initiate_rpc_hooks
In rocketmq-namesrv/src/bootstrap.rs the initiate_rpc_hooks() method still only logs “RPC hooks are not supported yet.” Replace the stub with calls to register ZoneRouteRPCHook on both the remoting client and server so that GetRouteInfoByTopic requests use the hook.
🤖 Prompt for AI Agents