Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
8d9e5e2
add notify min broker id change
Sep 8, 2025
f2f5836
add notify min broker id change
Sep 8, 2025
293924f
Merge remote-tracking branch 'upstream/main' into broker_id_change
Sep 8, 2025
5305d54
Merge remote-tracking branch 'upstream/main' into broker_id_change
Sep 8, 2025
daad8ef
Broker id Change function external logic implementation
Sep 9, 2025
2547c9b
Broker id Change function external logic implementation
Sep 9, 2025
79344ea
Broker id Change function external logic implementation
Sep 10, 2025
1471dd5
Implementation Notification Broker Id Change Logic
Sep 10, 2025
c15c6f3
Implementation Notification Broker Id Change Logic
Sep 10, 2025
b0f772a
Implementation Notification Broker Id Change Logic
Sep 10, 2025
7851482
Broker id Change function external logic implementation
Sep 10, 2025
afdaf8d
Broker id Change function external logic implementation
Sep 10, 2025
8a8fce3
Broker id Change function external logic implementation
Sep 10, 2025
883d018
Broker id Change function external logic implementation
Sep 10, 2025
ae12384
update broker ha info
Sep 12, 2025
aef0e5c
Broker id Change function external logic implementation
Sep 12, 2025
ec9ab12
fetch code
Sep 12, 2025
4f6b8a4
update broker ha info
Sep 12, 2025
a9853b6
update logic
Sep 12, 2025
ff7327f
update logic
Sep 12, 2025
8a10f21
merge
watchgou Sep 14, 2025
2086221
Merge remote-tracking branch 'upstream/main'
watchgou Sep 15, 2025
a53fca7
reset master flush offset
watchgou Sep 15, 2025
736c7cd
reset master flush offset
watchgou Sep 15, 2025
bb9525e
Merge remote-tracking branch 'upstream/main'
watchgou Sep 16, 2025
974c45e
Merge remote-tracking branch 'upstream/main'
watchgou Sep 16, 2025
1662aa0
Merge remote-tracking branch 'upstream/main'
watchgou Sep 16, 2025
10ca5d0
broker epoch cache handler
watchgou Sep 16, 2025
f725263
broker epoch cache handler
watchgou Sep 16, 2025
f4f481a
back ffi
watchgou Sep 16, 2025
89ef4e3
Comment out entry encode
watchgou Sep 16, 2025
7dd7612
Comment out entry encode
watchgou Sep 16, 2025
c8058f1
add object encode
watchgou Sep 16, 2025
cbc0163
update unwrap function
watchgou Sep 16, 2025
b54d637
back ffi push
watchgou Sep 16, 2025
08f5673
Merge remote-tracking branch 'upstream/main' into broker
watchgou Sep 17, 2025
727552a
notify broker role changed
watchgou Sep 17, 2025
e9349f6
Delete the imported module
watchgou Sep 17, 2025
25e5a6d
Optimization logic
watchgou Sep 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1848,6 +1848,11 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
&self.replicas_manager
}

#[inline]
pub fn replicas_manager_mut(&mut self) -> &mut Option<ReplicasManager> {
&mut self.replicas_manager
}

#[inline]
pub fn store_host(&self) -> SocketAddr {
self.store_host
Expand Down
18 changes: 17 additions & 1 deletion rocketmq-broker/src/controller/replicas_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use rocketmq_remoting::protocol::body::epoch_entry_cache::EpochEntry;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
Expand All @@ -15,7 +14,13 @@ use rocketmq_remoting::protocol::body::epoch_entry_cache::EpochEntry;
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use std::collections::HashSet;

use cheetah_string::CheetahString;
use rocketmq_remoting::protocol::body::epoch_entry_cache::EpochEntry;
use tracing::warn;

#[derive(Default)]
pub struct ReplicasManager {}

Expand All @@ -31,4 +36,15 @@ impl ReplicasManager {
pub fn get_epoch_entries(&self) -> Vec<EpochEntry> {
unimplemented!("")
}

pub async fn change_broker_role(
&mut self,
_new_master_broker_id: Option<u64>,
_new_master_address: Option<CheetahString>,
_new_master_epoch: Option<i32>,
_sync_state_set_epoch: Option<i32>,
_sync_state_set: Option<&HashSet<i64>>,
) -> rocketmq_error::RocketMQResult<()> {
Ok(())
}
}
12 changes: 12 additions & 0 deletions rocketmq-broker/src/processor/admin_broker_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::processor::admin_broker_processor::batch_mq_handler::BatchMqHandler;
use crate::processor::admin_broker_processor::broker_config_request_handler::BrokerConfigRequestHandler;
use crate::processor::admin_broker_processor::broker_epoch_cache_handler::BrokerEpochCacheHandler;
use crate::processor::admin_broker_processor::consumer_request_handler::ConsumerRequestHandler;
use crate::processor::admin_broker_processor::notify_broker_role_change_handler::NotifyBrokerRoleChangeHandler;
use crate::processor::admin_broker_processor::notify_min_broker_id_handler::NotifyMinBrokerChangeIdHandler;
use crate::processor::admin_broker_processor::offset_request_handler::OffsetRequestHandler;
use crate::processor::admin_broker_processor::reset_master_flusg_offset_handler::ResetMasterFlushOffsetHandler;
Expand All @@ -41,6 +42,7 @@ mod batch_mq_handler;
mod broker_config_request_handler;
mod broker_epoch_cache_handler;
mod consumer_request_handler;
mod notify_broker_role_change_handler;
mod notify_min_broker_id_handler;
mod offset_request_handler;
mod reset_master_flusg_offset_handler;
Expand All @@ -62,6 +64,7 @@ pub struct AdminBrokerProcessor<MS: MessageStore> {
update_broker_ha_handler: UpdateBrokerHaHandler<MS>,
reset_master_flusg_offset_handler: ResetMasterFlushOffsetHandler<MS>,
broker_epoch_cache_handler: BrokerEpochCacheHandler<MS>,
notify_broker_role_change_handler: NotifyBrokerRoleChangeHandler<MS>,
}

impl<MS> RequestProcessor for AdminBrokerProcessor<MS>
Expand Down Expand Up @@ -102,6 +105,9 @@ impl<MS: MessageStore> AdminBrokerProcessor<MS> {

let broker_epoch_cache_handler = BrokerEpochCacheHandler::new(broker_runtime_inner.clone());

let notify_broker_role_change_handler =
NotifyBrokerRoleChangeHandler::new(broker_runtime_inner.clone());

AdminBrokerProcessor {
topic_request_handler,
broker_config_request_handler,
Expand All @@ -114,6 +120,7 @@ impl<MS: MessageStore> AdminBrokerProcessor<MS> {
update_broker_ha_handler,
reset_master_flusg_offset_handler,
broker_epoch_cache_handler,
notify_broker_role_change_handler,
}
}
}
Expand Down Expand Up @@ -260,6 +267,11 @@ impl<MS: MessageStore> AdminBrokerProcessor<MS> {
.get_broker_epoch_cache(channel, ctx, request_code, request)
.await
}
RequestCode::NotifyBrokerRoleChanged => {
self.notify_broker_role_change_handler
.notify_broker_role_changed(channel, ctx, request_code, request)
.await
}
_ => Some(get_unknown_cmd_response(request_code)),
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use rocketmq_remoting::code::request_code::RequestCode;
use rocketmq_remoting::code::response_code::ResponseCode;
use rocketmq_remoting::net::channel::Channel;
use rocketmq_remoting::protocol::body::sync_state_set_body::SyncStateSet;
use rocketmq_remoting::protocol::header::notify_broker_role_change_request_header::NotifyBrokerRoleChangedRequestHeader;
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
use rocketmq_remoting::protocol::RemotingDeserializable;
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
use rocketmq_rust::ArcMut;
use rocketmq_store::base::message_store::MessageStore;
use tracing::info;

use crate::broker_runtime::BrokerRuntimeInner;

#[derive(Clone)]
pub struct NotifyBrokerRoleChangeHandler<MS: MessageStore> {
broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>,
}

impl<MS: MessageStore> NotifyBrokerRoleChangeHandler<MS> {
pub fn new(broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>) -> Self {
Self {
broker_runtime_inner,
}
}

pub async fn notify_broker_role_changed(
&mut self,
_channel: Channel,
_ctx: ConnectionHandlerContext,
_request_code: RequestCode,
request: &mut RemotingCommand,
) -> Option<RemotingCommand> {
let request_header =
request.decode_command_custom_header::<NotifyBrokerRoleChangedRequestHeader>();

let sync_state_set_info =
SyncStateSet::decode(request.get_body().unwrap()).unwrap_or_default();

Comment on lines +55 to +56
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Unchecked body access (unwrap) can crash the broker.

Requests without a body will panic here. Decode defensively and default or reject.

-        let sync_state_set_info =
-            SyncStateSet::decode(request.get_body().unwrap()).unwrap_or_default();
+        let sync_state_set_info = match request.get_body() {
+            Some(body) => SyncStateSet::decode(body).unwrap_or_default(),
+            None => SyncStateSet::default(),
+        };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
SyncStateSet::decode(request.get_body().unwrap()).unwrap_or_default();
let sync_state_set_info = match request.get_body() {
Some(body) => SyncStateSet::decode(body).unwrap_or_default(),
None => SyncStateSet::default(),
};
🤖 Prompt for AI Agents
In
rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs
around lines 55-56, the code calls request.get_body().unwrap() which will panic
if the request has no body; change this to defensively handle the Option by
first checking request.get_body() and either using a default SyncStateSet (e.g.,
SyncStateSet::default()) or returning an appropriate error response/logging when
the body is missing, then pass the bytes into SyncStateSet::decode and handle
its Result via unwrap_or_default or proper error propagation; ensure you log the
issue and avoid unwrap so the broker does not crash on malformed/missing bodies.

let response = RemotingCommand::create_response_command();

info!(
"Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}",
request_header.as_ref().expect("null")
);

Comment on lines +51 to +63
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Header decode + logging can panic; handle errors without crashing.

request_header.as_ref().expect("null") will panic on decode error before you branch on Ok. Avoid panics in request handlers; return an error response and log.

-use tracing::info;
+use tracing::{info, error};
@@
-        let request_header =
-            request.decode_command_custom_header::<NotifyBrokerRoleChangedRequestHeader>();
+        let mut response = RemotingCommand::create_response_command();
+        let request_header = match request
+            .decode_command_custom_header::<NotifyBrokerRoleChangedRequestHeader>()
+        {
+            Ok(h) => h,
+            Err(e) => {
+                error!("Failed to decode NotifyBrokerRoleChangedRequestHeader: {}", e);
+                return Some(response.set_code(ResponseCode::SystemError));
+            }
+        };
@@
-        let response = RemotingCommand::create_response_command();
-
-        info!(
-            "Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}",
-            request_header.as_ref().expect("null")
-        );
+        info!(
+            "Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}",
+            request_header
+        );

Committable suggestion skipped: line range outside the PR's diff.

if let Some(replicas_mangesr) = self.broker_runtime_inner.replicas_manager_mut() {
if let Ok(request_header) = request_header {
match replicas_mangesr
.change_broker_role(
request_header.master_broker_id,
request_header.master_address,
request_header.master_epoch,
request_header.sync_state_set_epoch,
sync_state_set_info.get_sync_state_set(),
Comment on lines +68 to +72
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

🧩 Analysis chain

Validate required header fields before applying changes.

If these header fields are Option, enforce presence for required ones (e.g., master_address, master_epoch, sync_state_set_epoch) and return a client/error code when missing.

If master_address/master_epoch/sync_state_set_epoch are optional in the type but required semantically, add a check after decoding and before invoking change_broker_role.


🏁 Script executed:

#!/bin/bash
# Inspect header field types/attributes to confirm which are Option and #[required]
rg -n -C2 'struct\s+NotifyBrokerRoleChangedRequestHeader' rocketmq-remoting/src/protocol/header
rg -n -C2 '#\[required\]' rocketmq-remoting/src/protocol/header

Length of output: 61219


Validate required header fields before applying changes.

NotifyBrokerRoleChangedRequestHeader marks master_address, master_epoch, sync_state_set_epoch, master_broker_id as #[required] but their types are Option; enforce presence (match/ok_or) and return an appropriate client/error code when any are missing before calling change_broker_role. Fix in rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs (around lines 68–72).

🤖 Prompt for AI Agents
In
rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs
around lines 68–72, the handler currently passes Option<T> fields
(master_address, master_epoch, sync_state_set_epoch, master_broker_id) directly
into change_broker_role; validate each required header field by matching or
using ok_or to convert missing Options into an early error/response (returning
the appropriate client/error code) before calling change_broker_role.
Specifically, extract master_broker_id, master_address, master_epoch, and
sync_state_set_epoch from the request_header, return a client error if any is
None (with a clear error code/message), and only then call change_broker_role
with the unwrapped values.

)
.await
{
Ok(_) => {}
Err(e) => {
panic!("Failed to call method change_broker_role: {}", e);
}
}
}
}

Some(response.set_code(ResponseCode::Success))
}
}
1 change: 1 addition & 0 deletions rocketmq-remoting/src/protocol/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub mod request;
pub mod response;
pub mod set_message_request_mode_request_body;
pub mod subscription_group_wrapper;
pub mod sync_state_set_body;
pub mod timer_metrics_serialize_wrapper;
pub mod topic;
pub mod topic_info_wrapper;
Expand Down
38 changes: 38 additions & 0 deletions rocketmq-remoting/src/protocol/body/sync_state_set_body.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use std::collections::HashSet;

use serde::Deserialize;
use serde::Serialize;

#[derive(Debug, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct SyncStateSet {
sync_state_set: Option<HashSet<i64>>,
sync_state_set_epoch: i32,
}

impl SyncStateSet {
pub fn get_sync_state_set(&self) -> Option<&HashSet<i64>> {
self.sync_state_set.as_ref()
}

pub fn get_sync_state_set_epoch(&self) -> i32 {
self.sync_state_set_epoch
}
}
1 change: 1 addition & 0 deletions rocketmq-remoting/src/protocol/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub mod message_operation_header;
pub mod namesrv;
pub mod notification_request_header;
pub mod notification_response_header;
pub mod notify_broker_role_change_request_header;
pub mod notify_consumer_ids_changed_request_header;
pub mod pop_message_request_header;
pub mod pop_message_response_header;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use std::fmt::Display;

use cheetah_string::CheetahString;
use rocketmq_macros::RequestHeaderCodec;
use serde::Deserialize;
use serde::Serialize;

#[derive(Serialize, Deserialize, Debug, RequestHeaderCodec)]
#[serde(rename_all = "camelCase")]
pub struct NotifyBrokerRoleChangedRequestHeader {
#[required]
pub master_address: Option<CheetahString>,

#[required]
pub master_epoch: Option<i32>,

#[required]
pub sync_state_set_epoch: Option<i32>,

#[required]
pub master_broker_id: Option<u64>,
}

impl Display for NotifyBrokerRoleChangedRequestHeader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"(master_address={:?}, master_epoch={:?}, sync_state_set_epoch={:?}, \
master_broker_id={:?})",
self.master_address,
self.master_epoch,
self.sync_state_set_epoch,
self.master_broker_id
)
}
}
Loading