-
Notifications
You must be signed in to change notification settings - Fork 175
notify broker role changed #4058
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8d9e5e2
f2f5836
293924f
5305d54
daad8ef
2547c9b
79344ea
1471dd5
c15c6f3
b0f772a
7851482
afdaf8d
8a8fce3
883d018
ae12384
aef0e5c
ec9ab12
4f6b8a4
a9853b6
ff7327f
8a10f21
2086221
a53fca7
736c7cd
bb9525e
974c45e
1662aa0
10ca5d0
f725263
f4f481a
89ef4e3
7dd7612
c8058f1
cbc0163
b54d637
08f5673
727552a
e9349f6
25e5a6d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,86 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| use rocketmq_remoting::code::request_code::RequestCode; | ||
| use rocketmq_remoting::code::response_code::ResponseCode; | ||
| use rocketmq_remoting::net::channel::Channel; | ||
| use rocketmq_remoting::protocol::body::sync_state_set_body::SyncStateSet; | ||
| use rocketmq_remoting::protocol::header::notify_broker_role_change_request_header::NotifyBrokerRoleChangedRequestHeader; | ||
| use rocketmq_remoting::protocol::remoting_command::RemotingCommand; | ||
| use rocketmq_remoting::protocol::RemotingDeserializable; | ||
| use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext; | ||
| use rocketmq_rust::ArcMut; | ||
| use rocketmq_store::base::message_store::MessageStore; | ||
| use tracing::info; | ||
|
|
||
| use crate::broker_runtime::BrokerRuntimeInner; | ||
|
|
||
| #[derive(Clone)] | ||
| pub struct NotifyBrokerRoleChangeHandler<MS: MessageStore> { | ||
| broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>, | ||
| } | ||
|
|
||
| impl<MS: MessageStore> NotifyBrokerRoleChangeHandler<MS> { | ||
| pub fn new(broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>) -> Self { | ||
| Self { | ||
| broker_runtime_inner, | ||
| } | ||
| } | ||
|
|
||
| pub async fn notify_broker_role_changed( | ||
| &mut self, | ||
| _channel: Channel, | ||
| _ctx: ConnectionHandlerContext, | ||
| _request_code: RequestCode, | ||
| request: &mut RemotingCommand, | ||
| ) -> Option<RemotingCommand> { | ||
| let request_header = | ||
| request.decode_command_custom_header::<NotifyBrokerRoleChangedRequestHeader>(); | ||
|
|
||
| let sync_state_set_info = | ||
| SyncStateSet::decode(request.get_body().unwrap()).unwrap_or_default(); | ||
|
|
||
| let response = RemotingCommand::create_response_command(); | ||
|
|
||
| info!( | ||
| "Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}", | ||
| request_header.as_ref().expect("null") | ||
| ); | ||
|
|
||
|
Comment on lines
+51
to
+63
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Header decode + logging can panic; handle errors without crashing.
-use tracing::info;
+use tracing::{info, error};
@@
- let request_header =
- request.decode_command_custom_header::<NotifyBrokerRoleChangedRequestHeader>();
+ let mut response = RemotingCommand::create_response_command();
+ let request_header = match request
+ .decode_command_custom_header::<NotifyBrokerRoleChangedRequestHeader>()
+ {
+ Ok(h) => h,
+ Err(e) => {
+ error!("Failed to decode NotifyBrokerRoleChangedRequestHeader: {}", e);
+ return Some(response.set_code(ResponseCode::SystemError));
+ }
+ };
@@
- let response = RemotingCommand::create_response_command();
-
- info!(
- "Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}",
- request_header.as_ref().expect("null")
- );
+ info!(
+ "Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}",
+ request_header
+ );
|
||
| if let Some(replicas_mangesr) = self.broker_runtime_inner.replicas_manager_mut() { | ||
| if let Ok(request_header) = request_header { | ||
| match replicas_mangesr | ||
| .change_broker_role( | ||
| request_header.master_broker_id, | ||
| request_header.master_address, | ||
| request_header.master_epoch, | ||
| request_header.sync_state_set_epoch, | ||
| sync_state_set_info.get_sync_state_set(), | ||
|
Comment on lines
+68
to
+72
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion 🧩 Analysis chainValidate required header fields before applying changes. If these header fields are Option, enforce presence for required ones (e.g., master_address, master_epoch, sync_state_set_epoch) and return a client/error code when missing. If master_address/master_epoch/sync_state_set_epoch are optional in the type but required semantically, add a check after decoding and before invoking change_broker_role. 🏁 Script executed: #!/bin/bash
# Inspect header field types/attributes to confirm which are Option and #[required]
rg -n -C2 'struct\s+NotifyBrokerRoleChangedRequestHeader' rocketmq-remoting/src/protocol/header
rg -n -C2 '#\[required\]' rocketmq-remoting/src/protocol/headerLength of output: 61219 Validate required header fields before applying changes. NotifyBrokerRoleChangedRequestHeader marks master_address, master_epoch, sync_state_set_epoch, master_broker_id as #[required] but their types are Option; enforce presence (match/ok_or) and return an appropriate client/error code when any are missing before calling change_broker_role. Fix in rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs (around lines 68–72). 🤖 Prompt for AI Agents |
||
| ) | ||
| .await | ||
| { | ||
| Ok(_) => {} | ||
| Err(e) => { | ||
| panic!("Failed to call method change_broker_role: {}", e); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Some(response.set_code(ResponseCode::Success)) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| use std::collections::HashSet; | ||
|
|
||
| use serde::Deserialize; | ||
| use serde::Serialize; | ||
|
|
||
| #[derive(Debug, Serialize, Deserialize, Default)] | ||
| #[serde(rename_all = "camelCase")] | ||
| pub struct SyncStateSet { | ||
| sync_state_set: Option<HashSet<i64>>, | ||
| sync_state_set_epoch: i32, | ||
| } | ||
|
|
||
| impl SyncStateSet { | ||
| pub fn get_sync_state_set(&self) -> Option<&HashSet<i64>> { | ||
| self.sync_state_set.as_ref() | ||
| } | ||
|
|
||
| pub fn get_sync_state_set_epoch(&self) -> i32 { | ||
| self.sync_state_set_epoch | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| use std::fmt::Display; | ||
|
|
||
| use cheetah_string::CheetahString; | ||
| use rocketmq_macros::RequestHeaderCodec; | ||
| use serde::Deserialize; | ||
| use serde::Serialize; | ||
|
|
||
| #[derive(Serialize, Deserialize, Debug, RequestHeaderCodec)] | ||
| #[serde(rename_all = "camelCase")] | ||
| pub struct NotifyBrokerRoleChangedRequestHeader { | ||
| #[required] | ||
| pub master_address: Option<CheetahString>, | ||
|
|
||
| #[required] | ||
| pub master_epoch: Option<i32>, | ||
|
|
||
| #[required] | ||
| pub sync_state_set_epoch: Option<i32>, | ||
|
|
||
| #[required] | ||
| pub master_broker_id: Option<u64>, | ||
| } | ||
|
|
||
| impl Display for NotifyBrokerRoleChangedRequestHeader { | ||
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| write!( | ||
| f, | ||
| "(master_address={:?}, master_epoch={:?}, sync_state_set_epoch={:?}, \ | ||
| master_broker_id={:?})", | ||
| self.master_address, | ||
| self.master_epoch, | ||
| self.sync_state_set_epoch, | ||
| self.master_broker_id | ||
| ) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unchecked body access (
unwrap) can crash the broker.Requests without a body will panic here. Decode defensively and default or reject.
📝 Committable suggestion
🤖 Prompt for AI Agents