Skip to content

Conversation

watchgou
Copy link
Contributor

@watchgou watchgou commented Sep 17, 2025

Which Issue(s) This PR Fixes(Closes)

Fixes #4059

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features
    • Admin pipeline now accepts and handles broker role-change notifications.
    • Brokers update replica role and synchronization state when notified.
    • Protocol extended to include role-change request headers and a sync-state payload (camelCase JSON).
    • Notifications produce success responses and informational logging to improve observability.

Copy link
Contributor

coderabbitai bot commented Sep 17, 2025

Walkthrough

Adds broker role-change notification support: new request header and SyncStateSet body types, a NotifyBrokerRoleChangeHandler wired into AdminBrokerProcessor, a mutable replicas_manager accessor, and a ReplicasManager async stub to apply role changes.

Changes

Cohort / File(s) Summary
Broker runtime accessor
rocketmq-broker/src/broker_runtime.rs
Adds replicas_manager_mut() returning &mut Option<ReplicasManager>; complements existing immutable accessor.
Replicas manager control API
rocketmq-broker/src/controller/replicas_manager.rs
Adds async change_broker_role(...) -> rocketmq_error::RocketMQResult<()> stub (returns Ok(())); imports HashSet and CheetahString.
Admin processor wiring
rocketmq-broker/src/processor/admin_broker_processor.rs, rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs
Introduces NotifyBrokerRoleChangeHandler<MS>, stores it in AdminBrokerProcessor, constructs it in new, and routes RequestCode::NotifyBrokerRoleChanged to the handler. Handler decodes header/body (SyncStateSet), logs, optionally calls replicas_manager.change_broker_role(...).await, and returns a success RemotingCommand.
Protocol body exposure
rocketmq-remoting/src/protocol/body.rs, rocketmq-remoting/src/protocol/body/sync_state_set_body.rs
Exposes sync_state_set_body module. Adds SyncStateSet with serde camelCase, derives, and getters for the optional HashSet<i64> and epoch.
Protocol header exposure
rocketmq-remoting/src/protocol/header.rs, rocketmq-remoting/src/protocol/header/notify_broker_role_change_request_header.rs
Exposes notify_broker_role_change_request_header module. Adds NotifyBrokerRoleChangedRequestHeader (serde camelCase, RequestHeaderCodec) with four optional fields and a Display impl.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant C as Client
  participant ABP as AdminBrokerProcessor
  participant H as NotifyBrokerRoleChangeHandler
  participant BR as BrokerRuntimeInner
  participant RM as ReplicasManager

  C->>ABP: RemotingCommand(RequestCode::NotifyBrokerRoleChanged)
  ABP->>H: notify_broker_role_changed(request)
  H->>H: decode header & body (SyncStateSet)
  H->>BR: replicas_manager_mut()
  alt replicas_manager present
    H->>RM: change_broker_role(master_id, addr, epoch, sss_epoch, set)
    RM-->>H: Ok(())
    H-->>ABP: RemotingCommand(Success)
  else no replicas_manager
    H-->>ABP: RemotingCommand(Success)  %% no-op
  end
  ABP-->>C: Response(Success)
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

I twitch my whiskers, tap a key,
announce a master change with glee.
Headers parsed and sets in tow,
I nudge the replicas — ready, go!
A joyful hop through code I flee 🐇✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title Check ✅ Passed The title "notify broker role changed" directly matches the primary change in this PR—adding support for NotifyBrokerRoleChanged notifications (new handler, protocol header/body types, and wiring into the admin processor and replicas manager). It is concise and clearly communicates the main intent to a reviewer. The phrasing is a fragment and not capitalized, but it remains unambiguous and relevant to the changeset.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Please see the documentation for more information.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@rocketmq-rust-bot
Copy link
Collaborator

🔊@watchgou 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

@rocketmq-rust-bot
Copy link
Collaborator

🔊@watchgou 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

Copy link

codecov bot commented Sep 17, 2025

Codecov Report

❌ Patch coverage is 0% with 68 lines in your changes missing coverage. Please review.
✅ Project coverage is 26.49%. Comparing base (abcff44) to head (25e5a6d).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...ker_processor/notify_broker_role_change_handler.rs 0.00% 37 Missing ⚠️
rocketmq-broker/src/controller/replicas_manager.rs 0.00% 10 Missing ⚠️
...tmq-broker/src/processor/admin_broker_processor.rs 0.00% 6 Missing ⚠️
...-remoting/src/protocol/body/sync_state_set_body.rs 0.00% 6 Missing ⚠️
...header/notify_broker_role_change_request_header.rs 0.00% 6 Missing ⚠️
rocketmq-broker/src/broker_runtime.rs 0.00% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #4058      +/-   ##
==========================================
- Coverage   26.51%   26.49%   -0.03%     
==========================================
  Files         571      574       +3     
  Lines       81071    81139      +68     
==========================================
+ Hits        21495    21496       +1     
- Misses      59576    59643      +67     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Collaborator

@rocketmq-rust-bot rocketmq-rust-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Nitpick comments (10)
rocketmq-remoting/src/protocol/header.rs (1)

55-55: Name consistency: use “role_changed” everywhere or add a re-export.

Module is notify_broker_role_change_request_header while the type and RequestCode use “RoleChanged”. Consider aligning names or adding a convenient re-export to reduce ambiguity.

Example:

 pub mod notify_broker_role_change_request_header;
+pub use notify_broker_role_change_request_header::NotifyBrokerRoleChangedRequestHeader;
rocketmq-remoting/src/protocol/header/notify_broker_role_change_request_header.rs (3)

25-39: Required fields wrapped in Option: consider making them non-Option.

All fields are marked #[required] yet typed as Option. If the header codec guarantees presence, prefer concrete types to avoid needless unwraps downstream.

-    pub master_address: Option<CheetahString>,
+    pub master_address: CheetahString,
-    pub master_epoch: Option<i32>,
+    pub master_epoch: i32,
-    pub sync_state_set_epoch: Option<i32>,
+    pub sync_state_set_epoch: i32,
-    pub master_broker_id: Option<u64>,
+    pub master_broker_id: u64,

If other parts rely on Option, keep as-is; otherwise this simplifies usage.


25-26: Derive Clone for ergonomics.

These headers are commonly cloned for logging/routing. Adding Clone is cheap and handy.

-#[derive(Serialize, Deserialize, Debug, RequestHeaderCodec)]
+#[derive(Serialize, Deserialize, Debug, Clone, RequestHeaderCodec)]

41-53: Display formatting is fine; optionally prefer Display over Debug for fields.

If you switch fields to non-Option, consider {} with Display impls for cleaner logs; current {:?} is acceptable.

rocketmq-broker/src/broker_runtime.rs (1)

1851-1854: LGTM: mutable accessor added.

Matches existing accessor patterns. If you anticipate frequent unchecked use, consider adding an replicas_manager_unchecked_mut() for symmetry with other fields.

Example to add elsewhere in this impl:

#[inline]
pub fn replicas_manager_unchecked_mut(&mut self) -> &mut ReplicasManager {
    unsafe { self.replicas_manager.as_mut().unwrap_unchecked() }
}
rocketmq-remoting/src/protocol/body/sync_state_set_body.rs (3)

31-33: Avoid cloning the entire HashSet; add a move-out accessor.

Returning Option<HashSet<i64>> clones the whole set. Provide an owning accessor to move it out where needed.

 impl SyncStateSet {
-    pub fn get_sync_state_set(&self) -> Option<HashSet<i64>> {
-        self.sync_state_set.clone()
-    }
+    pub fn get_sync_state_set(&self) -> Option<HashSet<i64>> {
+        self.sync_state_set.clone()
+    }
+    /// Prefer this when the caller needs ownership to avoid a clone.
+    pub fn into_sync_state_set(self) -> Option<HashSet<i64>> {
+        self.sync_state_set
+    }
 }

23-28: Skip serializing absent fields to reduce payload size.

sync_state_set is optional; skip it when None.

 #[derive(Debug, Serialize, Deserialize, Default)]
 #[serde(rename_all = "camelCase")]
 pub struct SyncStateSet {
-    sync_state_set: Option<HashSet<i64>>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    sync_state_set: Option<HashSet<i64>>,
     sync_state_set_epoch: i32,
 }

30-37: Rust naming nit: drop get_ prefix on accessors.

Idiomatic Rust favors sync_state_set() / sync_state_set_epoch(). Optional, but improves API ergonomics.

rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs (2)

28-35: Minor: import and variable naming polish.

  • Import error along with info.
  • Rename replicas_mangesr -> replicas_manager.

70-71: Use the new move-out accessor to avoid cloning.

Once into_sync_state_set() is added, switch to it to avoid an extra allocation.

-                let sync_state_set = sync_state_set_info.get_sync_state_set();
+                let sync_state_set_epoch = sync_state_set_info.get_sync_state_set_epoch();
+                let sync_state_set = sync_state_set_info.into_sync_state_set();
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between abcff44 and e9349f6.

📒 Files selected for processing (8)
  • rocketmq-broker/src/broker_runtime.rs (1 hunks)
  • rocketmq-broker/src/controller/replicas_manager.rs (2 hunks)
  • rocketmq-broker/src/processor/admin_broker_processor.rs (6 hunks)
  • rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs (1 hunks)
  • rocketmq-remoting/src/protocol/body.rs (1 hunks)
  • rocketmq-remoting/src/protocol/body/sync_state_set_body.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header/notify_broker_role_change_request_header.rs (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs (3)
rocketmq-remoting/src/protocol/remoting_command.rs (2)
  • code (578-580)
  • create_response_command (206-210)
rocketmq-broker/src/broker_runtime.rs (2)
  • message_store (1932-1934)
  • new (151-282)
rocketmq-broker/src/processor/admin_broker_processor.rs (1)
  • new (88-125)
rocketmq-broker/src/processor/admin_broker_processor.rs (3)
rocketmq-broker/src/broker_runtime.rs (1)
  • new (151-282)
rocketmq-broker/src/processor/admin_broker_processor/broker_epoch_cache_handler.rs (1)
  • new (36-40)
rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs (3)
  • new (52-57)
  • new (61-66)
  • request (75-76)
🔇 Additional comments (7)
rocketmq-remoting/src/protocol/body.rs (1)

59-59: LGTM: new body module export.

Export looks correct and consistent with other modules.

rocketmq-broker/src/processor/admin_broker_processor.rs (5)

33-33: LGTM: handler import wired.


45-45: LGTM: module declared.


67-68: LGTM: handler stored in processor.


108-110: LGTM: handler constructed with runtime.


270-274: Add lightweight tracing and ensure handler avoids panics.

Emit a trace before delegating and ensure notify_broker_role_changed handler never uses panic!/unwrap()/expect on decode failures (return an error RemotingCommand instead).

File: rocketmq-broker/src/processor/admin_broker_processor.rs — RequestCode::NotifyBrokerRoleChanged match arm.

Example:

 RequestCode::NotifyBrokerRoleChanged => {
+    tracing::trace!("Handling NotifyBrokerRoleChanged");
     self.notify_broker_role_change_handler
         .notify_broker_role_changed(channel, ctx, request_code, request)
         .await
 }

Automated search couldn't locate the handler implementation (ripgrep skipped files); manually verify notify_broker_role_change_handler for unwrap/expect/panic and convert decode paths to Result-based handling.

rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs (1)

44-50: Method doesn’t mutate self directly; can it take &self?

If replicas_manager_mut() requires &mut self, keep as-is. Otherwise prefer &self to reduce borrow constraints.

Comment on lines 1 to 3
use std::collections::HashSet;

use cheetah_string::CheetahString;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

🧩 Analysis chain

Type consistency: broker IDs are u64 but sync set uses i64.

Consider standardizing on u64 for replica IDs across header/body and this API to avoid lossy casts and surprises.


🏁 Script executed:

#!/bin/bash
# Find all occurrences of SyncStateSet and its element type usage.
rg -n -C2 'SyncStateSet|sync_state_set' --glob '!**/target/**'
rg -n -C2 'HashSet<i64>|HashSet<u64>' --glob '!**/target/**'

Length of output: 297


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "Listing repo root (for debugging why rg skipped files):"
ls -la

echo
echo "Searching for SyncStateSet, sync_state_set, HashSet<i64>, HashSet<u64>, and related type names..."
rg -n -S --hidden --no-ignore --glob '!target/**' '\b(SyncStateSet|sync_state_set|HashSet<\s*i64\s*>|HashSet<\s*u64\s*>)\b' || true

echo
echo "Searching for type aliases / broker/replica id identifiers (type SyncStateSet, BrokerId, replica_id, ReplicaId):"
rg -n -S --hidden --no-ignore --glob '!target/**' '\b(type\s+SyncStateSet|type\s+\w+\s*=\s*HashSet|BrokerId|broker_id|ReplicaId|replica_id)\b' || true

echo
echo "If present, show the start of rocketmq-broker/src/controller/replicas_manager.rs:"
if [ -f rocketmq-broker/src/controller/replicas_manager.rs ]; then
  echo "---- BEGIN FILE: rocketmq-broker/src/controller/replicas_manager.rs (first 200 lines) ----"
  sed -n '1,200p' rocketmq-broker/src/controller/replicas_manager.rs
  echo "---- END FILE SNIPPET ----"
else
  echo "File rocketmq-broker/src/controller/replicas_manager.rs not found."
fi

Length of output: 4797


Change sync_state_set element type to u64 (avoid i64/u64 mismatch)
In rocketmq-broker/src/controller/replicas_manager.rs update change_broker_role signature from _sync_state_set: Option<HashSet> to Option<HashSet> and adjust all callers to prevent lossy casts (broker IDs are u64 elsewhere).

🤖 Prompt for AI Agents
In rocketmq-broker/src/controller/replicas_manager.rs around lines 1-3, change
the change_broker_role parameter type from _sync_state_set: Option<HashSet<i64>>
to Option<HashSet<u64>> and update all callers to pass u64 broker IDs (not i64
casts) to avoid lossy conversions; modify the function signature, any internal
uses (iteration, comparisons, set lookups) to use u64, and update caller sites
to construct HashSet<u64> (or map their existing sets to u64 safely) so broker
IDs remain consistent across the codebase.

Comment on lines 38 to 47
pub async fn change_broker_role(
&mut self,
_new_master_broker_id: Option<u64>,
_new_master_address: Option<CheetahString>,
_new_master_epoch: Option<i32>,
_sync_state_set_epoch: Option<i32>,
_sync_state_set: Option<HashSet<i64>>,
) -> rocketmq_error::RocketMQResult<()> {
Ok(())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid silent no-op for role changes; at least log or validate inputs.

change_broker_role returns Ok(()) without action. This hides misconfigurations and makes debugging hard.

Add a warning and basic validation to surface misuse until full impl lands:

 pub async fn change_broker_role(
     &mut self,
     _new_master_broker_id: Option<u64>,
     _new_master_address: Option<CheetahString>,
     _new_master_epoch: Option<i32>,
     _sync_state_set_epoch: Option<i32>,
     _sync_state_set: Option<HashSet<i64>>,
 ) -> rocketmq_error::RocketMQResult<()> {
-    Ok(())
+    warn!("ReplicasManager::change_broker_role is not implemented yet; request will be ignored");
+    Ok(())
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn change_broker_role(
&mut self,
_new_master_broker_id: Option<u64>,
_new_master_address: Option<CheetahString>,
_new_master_epoch: Option<i32>,
_sync_state_set_epoch: Option<i32>,
_sync_state_set: Option<HashSet<i64>>,
) -> rocketmq_error::RocketMQResult<()> {
Ok(())
}
pub async fn change_broker_role(
&mut self,
_new_master_broker_id: Option<u64>,
_new_master_address: Option<CheetahString>,
_new_master_epoch: Option<i32>,
_sync_state_set_epoch: Option<i32>,
_sync_state_set: Option<HashSet<i64>>,
) -> rocketmq_error::RocketMQResult<()> {
warn!("ReplicasManager::change_broker_role is not implemented yet; request will be ignored");
Ok(())
}
🤖 Prompt for AI Agents
In rocketmq-broker/src/controller/replicas_manager.rs around lines 38–47, the
change_broker_role function currently does nothing and returns Ok(()); update it
to avoid silently ignoring calls by (1) emitting a warning-level log entry
whenever the function is invoked (include the provided argument values in the
log for debugging), and (2) performing basic validation: require that either all
new-master fields (new_master_broker_id, new_master_address, new_master_epoch)
are Some together or all are None, and require that _sync_state_set_epoch and
_sync_state_set are consistent (both Some or both None) and, if present,
non-empty; if validation fails return an appropriate RocketMQ error instead of
Ok(()). Ensure the logging uses the existing logger/context and do not implement
full role-change logic—this is just to surface misuse until the real
implementation is added.

Comment on lines +51 to +63
let request_header =
request.decode_command_custom_header::<NotifyBrokerRoleChangedRequestHeader>();

let sync_state_set_info =
SyncStateSet::decode(request.get_body().unwrap()).unwrap_or_default();

let response = RemotingCommand::create_response_command();

info!(
"Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}",
request_header.as_ref().expect("null")
);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Header decode + logging can panic; handle errors without crashing.

request_header.as_ref().expect("null") will panic on decode error before you branch on Ok. Avoid panics in request handlers; return an error response and log.

-use tracing::info;
+use tracing::{info, error};
@@
-        let request_header =
-            request.decode_command_custom_header::<NotifyBrokerRoleChangedRequestHeader>();
+        let mut response = RemotingCommand::create_response_command();
+        let request_header = match request
+            .decode_command_custom_header::<NotifyBrokerRoleChangedRequestHeader>()
+        {
+            Ok(h) => h,
+            Err(e) => {
+                error!("Failed to decode NotifyBrokerRoleChangedRequestHeader: {}", e);
+                return Some(response.set_code(ResponseCode::SystemError));
+            }
+        };
@@
-        let response = RemotingCommand::create_response_command();
-
-        info!(
-            "Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}",
-            request_header.as_ref().expect("null")
-        );
+        info!(
+            "Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}",
+            request_header
+        );

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +55 to +56
SyncStateSet::decode(request.get_body().unwrap()).unwrap_or_default();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Unchecked body access (unwrap) can crash the broker.

Requests without a body will panic here. Decode defensively and default or reject.

-        let sync_state_set_info =
-            SyncStateSet::decode(request.get_body().unwrap()).unwrap_or_default();
+        let sync_state_set_info = match request.get_body() {
+            Some(body) => SyncStateSet::decode(body).unwrap_or_default(),
+            None => SyncStateSet::default(),
+        };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
SyncStateSet::decode(request.get_body().unwrap()).unwrap_or_default();
let sync_state_set_info = match request.get_body() {
Some(body) => SyncStateSet::decode(body).unwrap_or_default(),
None => SyncStateSet::default(),
};
🤖 Prompt for AI Agents
In
rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs
around lines 55-56, the code calls request.get_body().unwrap() which will panic
if the request has no body; change this to defensively handle the Option by
first checking request.get_body() and either using a default SyncStateSet (e.g.,
SyncStateSet::default()) or returning an appropriate error response/logging when
the body is missing, then pass the bytes into SyncStateSet::decode and handle
its Result via unwrap_or_default or proper error propagation; ensure you log the
issue and avoid unwrap so the broker does not crash on malformed/missing bodies.

Comment on lines 57 to 90
let response = RemotingCommand::create_response_command();

info!(
"Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}",
request_header.as_ref().expect("null")
);

if let Some(replicas_mangesr) = self.broker_runtime_inner.replicas_manager_mut() {
if let Ok(request_header) = request_header {
let master_broker_id = request_header.master_broker_id;
let master_address = request_header.master_address;
let master_epoch = request_header.master_epoch;
let sync_state_set_epoch = request_header.sync_state_set_epoch;
let sync_state_set = sync_state_set_info.get_sync_state_set();
match replicas_mangesr
.change_broker_role(
master_broker_id,
master_address,
master_epoch,
sync_state_set_epoch,
sync_state_set,
)
.await
{
Ok(_) => {}
Err(e) => {
panic!("Failed to call method change_broker_role: {}", e);
}
}
}
}

Some(response.set_code(ResponseCode::Success))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Always returning Success hides failures; propagate error codes.

create_response_command() already sets Success; you override to Success again regardless of failures. After adopting the above error paths, just return the response.

-        Some(response.set_code(ResponseCode::Success))
+        Some(response)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let response = RemotingCommand::create_response_command();
info!(
"Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}",
request_header.as_ref().expect("null")
);
if let Some(replicas_mangesr) = self.broker_runtime_inner.replicas_manager_mut() {
if let Ok(request_header) = request_header {
let master_broker_id = request_header.master_broker_id;
let master_address = request_header.master_address;
let master_epoch = request_header.master_epoch;
let sync_state_set_epoch = request_header.sync_state_set_epoch;
let sync_state_set = sync_state_set_info.get_sync_state_set();
match replicas_mangesr
.change_broker_role(
master_broker_id,
master_address,
master_epoch,
sync_state_set_epoch,
sync_state_set,
)
.await
{
Ok(_) => {}
Err(e) => {
panic!("Failed to call method change_broker_role: {}", e);
}
}
}
}
Some(response.set_code(ResponseCode::Success))
}
let response = RemotingCommand::create_response_command();
info!(
"Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}",
request_header.as_ref().expect("null")
);
if let Some(replicas_mangesr) = self.broker_runtime_inner.replicas_manager_mut() {
if let Ok(request_header) = request_header {
let master_broker_id = request_header.master_broker_id;
let master_address = request_header.master_address;
let master_epoch = request_header.master_epoch;
let sync_state_set_epoch = request_header.sync_state_set_epoch;
let sync_state_set = sync_state_set_info.get_sync_state_set();
match replicas_mangesr
.change_broker_role(
master_broker_id,
master_address,
master_epoch,
sync_state_set_epoch,
sync_state_set,
)
.await
{
Ok(_) => {}
Err(e) => {
panic!("Failed to call method change_broker_role: {}", e);
}
}
}
}
Some(response)
}
🤖 Prompt for AI Agents
In
rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs
around lines 57 to 90, the handler always returns Success
(create_response_command already defaults to Success) and even panics on
change_broker_role errors; change this so you do not unconditionally override
Success—if change_broker_role returns Err, set the response code to an
appropriate failure ResponseCode and include the error message (e.g., via
response.set_remark or similar) instead of panicking; on success just return the
response as-is. Ensure you remove the unconditional
Some(response.set_code(ResponseCode::Success)) and instead return the response
after you set the proper code/remark per outcome.

Comment on lines 64 to 86
if let Some(replicas_mangesr) = self.broker_runtime_inner.replicas_manager_mut() {
if let Ok(request_header) = request_header {
let master_broker_id = request_header.master_broker_id;
let master_address = request_header.master_address;
let master_epoch = request_header.master_epoch;
let sync_state_set_epoch = request_header.sync_state_set_epoch;
let sync_state_set = sync_state_set_info.get_sync_state_set();
match replicas_mangesr
.change_broker_role(
master_broker_id,
master_address,
master_epoch,
sync_state_set_epoch,
sync_state_set,
)
.await
{
Ok(_) => {}
Err(e) => {
panic!("Failed to call method change_broker_role: {}", e);
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Don’t panic on replica update failures; return an error response and log. Also fix typo.

  • replicas_mangesr typo.
  • panic! on .await error will take down the handler thread/process.
-        if let Some(replicas_mangesr) = self.broker_runtime_inner.replicas_manager_mut() {
-            if let Ok(request_header) = request_header {
-                let master_broker_id = request_header.master_broker_id;
-                let master_address = request_header.master_address;
-                let master_epoch = request_header.master_epoch;
-                let sync_state_set_epoch = request_header.sync_state_set_epoch;
-                let sync_state_set = sync_state_set_info.get_sync_state_set();
-                match replicas_mangesr
-                    .change_broker_role(
-                        master_broker_id,
-                        master_address,
-                        master_epoch,
-                        sync_state_set_epoch,
-                        sync_state_set,
-                    )
-                    .await
-                {
-                    Ok(_) => {}
-                    Err(e) => {
-                        panic!("Failed to call method change_broker_role: {}", e);
-                    }
-                }
-            }
-        }
+        if let Some(replicas_manager) = self.broker_runtime_inner.replicas_manager_mut() {
+            let master_broker_id = request_header.master_broker_id;
+            let master_address = request_header.master_address;
+            let master_epoch = request_header.master_epoch;
+            let sync_state_set_epoch = request_header.sync_state_set_epoch;
+            // Prefer move-out to avoid cloning if you add `into_sync_state_set()`.
+            let sync_state_set = sync_state_set_info.get_sync_state_set();
+            if let Err(e) = replicas_manager
+                .change_broker_role(
+                    master_broker_id,
+                    master_address,
+                    master_epoch,
+                    sync_state_set_epoch,
+                    sync_state_set,
+                )
+                .await
+            {
+                error!("Failed to change_broker_role: {}", e);
+                return Some(response.set_code(ResponseCode::SystemError));
+            }
+        } else {
+            error!("ReplicasManager not initialized; cannot apply role change");
+            return Some(response.set_code(ResponseCode::SystemError));
+        }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In
rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs
around lines 64 to 86, fix the typo `replicas_mangesr` to `replicas_manager` and
remove the panic on change_broker_role failure; instead log the error (include
context and the error) and return an appropriate error response/result to the
caller (e.g., set the response code/remark or propagate a Err variant) so the
handler does not abort the thread/process.

Comment on lines 66 to 70
let master_broker_id = request_header.master_broker_id;
let master_address = request_header.master_address;
let master_epoch = request_header.master_epoch;
let sync_state_set_epoch = request_header.sync_state_set_epoch;
let sync_state_set = sync_state_set_info.get_sync_state_set();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

🧩 Analysis chain

Confirm header field nullability and enforce required fields.

If NotifyBrokerRoleChangedRequestHeader uses Option<...> fields (as commonly done for headers), unwrap with validation and fail fast when missing. Otherwise, ignore this.

If the header fields are Option, use this pattern:

+        // If header fields are Option<...>
+        let (Some(master_broker_id), Some(master_address), Some(master_epoch), Some(sync_state_set_epoch)) = (
+            request_header.master_broker_id,
+            request_header.master_address,
+            request_header.master_epoch,
+            request_header.sync_state_set_epoch,
+        ) else {
+            error!("Missing required header fields in NotifyBrokerRoleChangedRequestHeader");
+            return Some(response.set_code(ResponseCode::SystemError));
+        };

🏁 Script executed:

#!/bin/bash
# Inspect header and response codes to validate field types and usable error codes.
rg -n -C2 'struct\s+NotifyBrokerRoleChangedRequestHeader' rocketmq-remoting/src/protocol/header
rg -n -C2 'pub\s+enum\s+ResponseCode' rocketmq-remoting/src

Length of output: 972


Validate and unwrap required header Option fields before use.

NotifyBrokerRoleChangedRequestHeader fields are Option (master_address is annotated #[required]). Unwrap/validate required/used fields and fail fast (return ResponseCode::SystemError) if any are missing.

Location: rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs (~lines 66–70)

Suggested fix:

+        let (Some(master_broker_id), Some(master_address), Some(master_epoch), Some(sync_state_set_epoch)) = (
+            request_header.master_broker_id,
+            request_header.master_address,
+            request_header.master_epoch,
+            request_header.sync_state_set_epoch,
+        ) else {
+            error!("Missing required header fields in NotifyBrokerRoleChangedRequestHeader");
+            return Some(response.set_code(ResponseCode::SystemError));
+        };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let master_broker_id = request_header.master_broker_id;
let master_address = request_header.master_address;
let master_epoch = request_header.master_epoch;
let sync_state_set_epoch = request_header.sync_state_set_epoch;
let sync_state_set = sync_state_set_info.get_sync_state_set();
let (Some(master_broker_id), Some(master_address), Some(master_epoch), Some(sync_state_set_epoch)) = (
request_header.master_broker_id,
request_header.master_address,
request_header.master_epoch,
request_header.sync_state_set_epoch,
) else {
error!("Missing required header fields in NotifyBrokerRoleChangedRequestHeader");
return Some(response.set_code(ResponseCode::SystemError));
};
let sync_state_set = sync_state_set_info.get_sync_state_set();

@rocketmq-rust-bot
Copy link
Collaborator

🔊@watchgou 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

Copy link
Collaborator

@rocketmq-rust-bot rocketmq-rust-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (5)
rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs (5)

51-53: Header decode can panic; fail fast and return an error response.

Avoid panicking in request path; handle decode Result and log.

-        let request_header =
-            request.decode_command_custom_header::<NotifyBrokerRoleChangedRequestHeader>();
+        let response = RemotingCommand::create_response_command();
+        let request_header = match request
+            .decode_command_custom_header::<NotifyBrokerRoleChangedRequestHeader>()
+        {
+            Ok(h) => h,
+            Err(e) => {
+                error!("Failed to decode NotifyBrokerRoleChangedRequestHeader: {}", e);
+                return Some(response.set_code(ResponseCode::SystemError));
+            }
+        };

54-56: Unchecked body unwrap can crash the broker; decode defensively.

Requests may have no body; don’t unwrap().

-        let sync_state_set_info =
-            SyncStateSet::decode(request.get_body().unwrap()).unwrap_or_default();
+        let sync_state_set_info = match request.get_body() {
+            Some(body) => SyncStateSet::decode(body).unwrap_or_default(),
+            None => SyncStateSet::default(),
+        };

57-63: Logging currently uses expect on a Result; log the decoded header instead.

After handling decode above, remove the panic‑prone expect.

-        let response = RemotingCommand::create_response_command();
-
         info!(
             "Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}",
-            request_header.as_ref().expect("null")
+            request_header
         );

64-82: Don’t panic on role-change failures; handle missing ReplicasManager; fix variable name.

  • Rename replicas_mangesr → replicas_manager.
  • If manager is None, return SystemError.
  • On .await error, log and return error instead of panic!.
-        if let Some(replicas_mangesr) = self.broker_runtime_inner.replicas_manager_mut() {
-            if let Ok(request_header) = request_header {
-                match replicas_mangesr
-                    .change_broker_role(
-                        request_header.master_broker_id,
-                        request_header.master_address,
-                        request_header.master_epoch,
-                        request_header.sync_state_set_epoch,
-                        sync_state_set_info.get_sync_state_set(),
-                    )
-                    .await
-                {
-                    Ok(_) => {}
-                    Err(e) => {
-                        panic!("Failed to call method change_broker_role: {}", e);
-                    }
-                }
-            }
-        }
+        if let Some(replicas_manager) = self.broker_runtime_inner.replicas_manager_mut() {
+            if let Err(e) = replicas_manager
+                .change_broker_role(
+                    request_header.master_broker_id,
+                    request_header.master_address,
+                    request_header.master_epoch,
+                    request_header.sync_state_set_epoch,
+                    sync_state_set_info.get_sync_state_set(),
+                )
+                .await
+            {
+                error!("Failed to change_broker_role: {}", e);
+                return Some(response.set_code(ResponseCode::SystemError));
+            }
+        } else {
+            error!("ReplicasManager not initialized; cannot apply role change");
+            return Some(response.set_code(ResponseCode::SystemError));
+        }

84-85: Don’t force Success; let earlier paths set errors.

create_response_command() is already Success; just return response.

-        Some(response.set_code(ResponseCode::Success))
+        Some(response)
🧹 Nitpick comments (1)
rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs (1)

28-28: Import error-level logging for proper error handling.

You use error! in the fixes below; bring it into scope.

-use tracing::info;
+use tracing::{info, error};
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e9349f6 and 25e5a6d.

📒 Files selected for processing (3)
  • rocketmq-broker/src/controller/replicas_manager.rs (2 hunks)
  • rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs (1 hunks)
  • rocketmq-remoting/src/protocol/body/sync_state_set_body.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • rocketmq-remoting/src/protocol/body/sync_state_set_body.rs
  • rocketmq-broker/src/controller/replicas_manager.rs
🧰 Additional context used
🧬 Code graph analysis (1)
rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs (3)
rocketmq-remoting/src/protocol/remoting_command.rs (2)
  • code (578-580)
  • create_response_command (206-210)
rocketmq-broker/src/broker_runtime.rs (2)
  • message_store (1932-1934)
  • new (151-282)
rocketmq-broker/src/processor/admin_broker_processor.rs (1)
  • new (88-125)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: build
  • GitHub Check: test
  • GitHub Check: build (windows-latest, nightly)
  • GitHub Check: build (macos-latest, nightly)
  • GitHub Check: build (macos-latest, stable)
  • GitHub Check: build (ubuntu-latest, nightly)
  • GitHub Check: build (windows-latest, stable)
  • GitHub Check: build (ubuntu-latest, stable)
  • GitHub Check: auto-approve

Comment on lines +68 to +72
request_header.master_broker_id,
request_header.master_address,
request_header.master_epoch,
request_header.sync_state_set_epoch,
sync_state_set_info.get_sync_state_set(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

🧩 Analysis chain

Validate required header fields before applying changes.

If these header fields are Option, enforce presence for required ones (e.g., master_address, master_epoch, sync_state_set_epoch) and return a client/error code when missing.

If master_address/master_epoch/sync_state_set_epoch are optional in the type but required semantically, add a check after decoding and before invoking change_broker_role.


🏁 Script executed:

#!/bin/bash
# Inspect header field types/attributes to confirm which are Option and #[required]
rg -n -C2 'struct\s+NotifyBrokerRoleChangedRequestHeader' rocketmq-remoting/src/protocol/header
rg -n -C2 '#\[required\]' rocketmq-remoting/src/protocol/header

Length of output: 61219


Validate required header fields before applying changes.

NotifyBrokerRoleChangedRequestHeader marks master_address, master_epoch, sync_state_set_epoch, master_broker_id as #[required] but their types are Option; enforce presence (match/ok_or) and return an appropriate client/error code when any are missing before calling change_broker_role. Fix in rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs (around lines 68–72).

🤖 Prompt for AI Agents
In
rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs
around lines 68–72, the handler currently passes Option<T> fields
(master_address, master_epoch, sync_state_set_epoch, master_broker_id) directly
into change_broker_role; validate each required header field by matching or
using ok_or to convert missing Options into an early error/response (returning
the appropriate client/error code) before calling change_broker_role.
Specifically, extract master_broker_id, master_address, master_epoch, and
sync_state_set_epoch from the request_header, return a client error if any is
None (with a clear error code/message), and only then call change_broker_role
with the unwrapped values.

@rocketmq-rust-robot rocketmq-rust-robot added the feature🚀 Suggest an idea for this project. label Sep 18, 2025
Copy link
Owner

@mxsm mxsm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mxsm mxsm merged commit b4d8a02 into mxsm:main Sep 18, 2025
15 of 22 checks passed
@rocketmq-rust-bot rocketmq-rust-bot added approved PR has approved and removed ready to review waiting-review waiting review this PR labels Sep 18, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AI review first Ai review pr first approved PR has approved auto merge feature🚀 Suggest an idea for this project.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature🚀] Notify broker role changed(RequstCode:1008)

4 participants