-
Notifications
You must be signed in to change notification settings - Fork 175
notify broker role changed #4058
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds broker role-change notification support: new request header and SyncStateSet body types, a NotifyBrokerRoleChangeHandler wired into AdminBrokerProcessor, a mutable replicas_manager accessor, and a ReplicasManager async stub to apply role changes. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant C as Client
participant ABP as AdminBrokerProcessor
participant H as NotifyBrokerRoleChangeHandler
participant BR as BrokerRuntimeInner
participant RM as ReplicasManager
C->>ABP: RemotingCommand(RequestCode::NotifyBrokerRoleChanged)
ABP->>H: notify_broker_role_changed(request)
H->>H: decode header & body (SyncStateSet)
H->>BR: replicas_manager_mut()
alt replicas_manager present
H->>RM: change_broker_role(master_id, addr, epoch, sss_epoch, set)
RM-->>H: Ok(())
H-->>ABP: RemotingCommand(Success)
else no replicas_manager
H-->>ABP: RemotingCommand(Success) %% no-op
end
ABP-->>C: Response(Success)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal). Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
🔊@watchgou 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
🔊@watchgou 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #4058 +/- ##
==========================================
- Coverage 26.51% 26.49% -0.03%
==========================================
Files 571 574 +3
Lines 81071 81139 +68
==========================================
+ Hits 21495 21496 +1
- Misses 59576 59643 +67 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
🧹 Nitpick comments (10)
rocketmq-remoting/src/protocol/header.rs (1)
55-55
: Name consistency: use “role_changed” everywhere or add a re-export.Module is
notify_broker_role_change_request_header
while the type and RequestCode use “RoleChanged”. Consider aligning names or adding a convenient re-export to reduce ambiguity.Example:
pub mod notify_broker_role_change_request_header; +pub use notify_broker_role_change_request_header::NotifyBrokerRoleChangedRequestHeader;
rocketmq-remoting/src/protocol/header/notify_broker_role_change_request_header.rs (3)
25-39
: Required fields wrapped in Option: consider making them non-Option.All fields are marked #[required] yet typed as Option. If the header codec guarantees presence, prefer concrete types to avoid needless unwraps downstream.
- pub master_address: Option<CheetahString>, + pub master_address: CheetahString, - pub master_epoch: Option<i32>, + pub master_epoch: i32, - pub sync_state_set_epoch: Option<i32>, + pub sync_state_set_epoch: i32, - pub master_broker_id: Option<u64>, + pub master_broker_id: u64,If other parts rely on Option, keep as-is; otherwise this simplifies usage.
25-26
: Derive Clone for ergonomics.These headers are commonly cloned for logging/routing. Adding Clone is cheap and handy.
-#[derive(Serialize, Deserialize, Debug, RequestHeaderCodec)] +#[derive(Serialize, Deserialize, Debug, Clone, RequestHeaderCodec)]
41-53
: Display formatting is fine; optionally prefer Display over Debug for fields.If you switch fields to non-Option, consider
{}
with Display impls for cleaner logs; current{:?}
is acceptable.rocketmq-broker/src/broker_runtime.rs (1)
1851-1854
: LGTM: mutable accessor added.Matches existing accessor patterns. If you anticipate frequent unchecked use, consider adding an
replicas_manager_unchecked_mut()
for symmetry with other fields.Example to add elsewhere in this impl:
#[inline] pub fn replicas_manager_unchecked_mut(&mut self) -> &mut ReplicasManager { unsafe { self.replicas_manager.as_mut().unwrap_unchecked() } }rocketmq-remoting/src/protocol/body/sync_state_set_body.rs (3)
31-33
: Avoid cloning the entire HashSet; add a move-out accessor.Returning
Option<HashSet<i64>>
clones the whole set. Provide an owning accessor to move it out where needed.impl SyncStateSet { - pub fn get_sync_state_set(&self) -> Option<HashSet<i64>> { - self.sync_state_set.clone() - } + pub fn get_sync_state_set(&self) -> Option<HashSet<i64>> { + self.sync_state_set.clone() + } + /// Prefer this when the caller needs ownership to avoid a clone. + pub fn into_sync_state_set(self) -> Option<HashSet<i64>> { + self.sync_state_set + } }
23-28
: Skip serializing absent fields to reduce payload size.
sync_state_set
is optional; skip it whenNone
.#[derive(Debug, Serialize, Deserialize, Default)] #[serde(rename_all = "camelCase")] pub struct SyncStateSet { - sync_state_set: Option<HashSet<i64>>, + #[serde(skip_serializing_if = "Option::is_none")] + sync_state_set: Option<HashSet<i64>>, sync_state_set_epoch: i32, }
30-37
: Rust naming nit: dropget_
prefix on accessors.Idiomatic Rust favors
sync_state_set()
/sync_state_set_epoch()
. Optional, but improves API ergonomics.rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs (2)
28-35
: Minor: import and variable naming polish.
- Import
error
along withinfo
.- Rename
replicas_mangesr
->replicas_manager
.
70-71
: Use the new move-out accessor to avoid cloning.Once
into_sync_state_set()
is added, switch to it to avoid an extra allocation.- let sync_state_set = sync_state_set_info.get_sync_state_set(); + let sync_state_set_epoch = sync_state_set_info.get_sync_state_set_epoch(); + let sync_state_set = sync_state_set_info.into_sync_state_set();
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
rocketmq-broker/src/broker_runtime.rs
(1 hunks)rocketmq-broker/src/controller/replicas_manager.rs
(2 hunks)rocketmq-broker/src/processor/admin_broker_processor.rs
(6 hunks)rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs
(1 hunks)rocketmq-remoting/src/protocol/body.rs
(1 hunks)rocketmq-remoting/src/protocol/body/sync_state_set_body.rs
(1 hunks)rocketmq-remoting/src/protocol/header.rs
(1 hunks)rocketmq-remoting/src/protocol/header/notify_broker_role_change_request_header.rs
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs (3)
rocketmq-remoting/src/protocol/remoting_command.rs (2)
code
(578-580)create_response_command
(206-210)rocketmq-broker/src/broker_runtime.rs (2)
message_store
(1932-1934)new
(151-282)rocketmq-broker/src/processor/admin_broker_processor.rs (1)
new
(88-125)
rocketmq-broker/src/processor/admin_broker_processor.rs (3)
rocketmq-broker/src/broker_runtime.rs (1)
new
(151-282)rocketmq-broker/src/processor/admin_broker_processor/broker_epoch_cache_handler.rs (1)
new
(36-40)rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs (3)
new
(52-57)new
(61-66)request
(75-76)
🔇 Additional comments (7)
rocketmq-remoting/src/protocol/body.rs (1)
59-59
: LGTM: new body module export.Export looks correct and consistent with other modules.
rocketmq-broker/src/processor/admin_broker_processor.rs (5)
33-33
: LGTM: handler import wired.
45-45
: LGTM: module declared.
67-68
: LGTM: handler stored in processor.
108-110
: LGTM: handler constructed with runtime.
270-274
: Add lightweight tracing and ensure handler avoids panics.Emit a trace before delegating and ensure notify_broker_role_changed handler never uses panic!/unwrap()/expect on decode failures (return an error RemotingCommand instead).
File: rocketmq-broker/src/processor/admin_broker_processor.rs — RequestCode::NotifyBrokerRoleChanged match arm.
Example:
RequestCode::NotifyBrokerRoleChanged => { + tracing::trace!("Handling NotifyBrokerRoleChanged"); self.notify_broker_role_change_handler .notify_broker_role_changed(channel, ctx, request_code, request) .await }
Automated search couldn't locate the handler implementation (ripgrep skipped files); manually verify notify_broker_role_change_handler for unwrap/expect/panic and convert decode paths to Result-based handling.
rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs (1)
44-50
: Method doesn’t mutateself
directly; can it take&self
?If
replicas_manager_mut()
requires&mut self
, keep as-is. Otherwise prefer&self
to reduce borrow constraints.
use std::collections::HashSet; | ||
|
||
use cheetah_string::CheetahString; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
🧩 Analysis chain
Type consistency: broker IDs are u64 but sync set uses i64.
Consider standardizing on u64 for replica IDs across header/body and this API to avoid lossy casts and surprises.
🏁 Script executed:
#!/bin/bash
# Find all occurrences of SyncStateSet and its element type usage.
rg -n -C2 'SyncStateSet|sync_state_set' --glob '!**/target/**'
rg -n -C2 'HashSet<i64>|HashSet<u64>' --glob '!**/target/**'
Length of output: 297
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "Listing repo root (for debugging why rg skipped files):"
ls -la
echo
echo "Searching for SyncStateSet, sync_state_set, HashSet<i64>, HashSet<u64>, and related type names..."
rg -n -S --hidden --no-ignore --glob '!target/**' '\b(SyncStateSet|sync_state_set|HashSet<\s*i64\s*>|HashSet<\s*u64\s*>)\b' || true
echo
echo "Searching for type aliases / broker/replica id identifiers (type SyncStateSet, BrokerId, replica_id, ReplicaId):"
rg -n -S --hidden --no-ignore --glob '!target/**' '\b(type\s+SyncStateSet|type\s+\w+\s*=\s*HashSet|BrokerId|broker_id|ReplicaId|replica_id)\b' || true
echo
echo "If present, show the start of rocketmq-broker/src/controller/replicas_manager.rs:"
if [ -f rocketmq-broker/src/controller/replicas_manager.rs ]; then
echo "---- BEGIN FILE: rocketmq-broker/src/controller/replicas_manager.rs (first 200 lines) ----"
sed -n '1,200p' rocketmq-broker/src/controller/replicas_manager.rs
echo "---- END FILE SNIPPET ----"
else
echo "File rocketmq-broker/src/controller/replicas_manager.rs not found."
fi
Length of output: 4797
Change sync_state_set element type to u64 (avoid i64/u64 mismatch)
In rocketmq-broker/src/controller/replicas_manager.rs update change_broker_role signature from _sync_state_set: Option<HashSet> to Option<HashSet> and adjust all callers to prevent lossy casts (broker IDs are u64 elsewhere).
🤖 Prompt for AI Agents
In rocketmq-broker/src/controller/replicas_manager.rs around lines 1-3, change
the change_broker_role parameter type from _sync_state_set: Option<HashSet<i64>>
to Option<HashSet<u64>> and update all callers to pass u64 broker IDs (not i64
casts) to avoid lossy conversions; modify the function signature, any internal
uses (iteration, comparisons, set lookups) to use u64, and update caller sites
to construct HashSet<u64> (or map their existing sets to u64 safely) so broker
IDs remain consistent across the codebase.
pub async fn change_broker_role( | ||
&mut self, | ||
_new_master_broker_id: Option<u64>, | ||
_new_master_address: Option<CheetahString>, | ||
_new_master_epoch: Option<i32>, | ||
_sync_state_set_epoch: Option<i32>, | ||
_sync_state_set: Option<HashSet<i64>>, | ||
) -> rocketmq_error::RocketMQResult<()> { | ||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid silent no-op for role changes; at least log or validate inputs.
change_broker_role
returns Ok(()) without action. This hides misconfigurations and makes debugging hard.
Add a warning and basic validation to surface misuse until full impl lands:
pub async fn change_broker_role(
&mut self,
_new_master_broker_id: Option<u64>,
_new_master_address: Option<CheetahString>,
_new_master_epoch: Option<i32>,
_sync_state_set_epoch: Option<i32>,
_sync_state_set: Option<HashSet<i64>>,
) -> rocketmq_error::RocketMQResult<()> {
- Ok(())
+ warn!("ReplicasManager::change_broker_role is not implemented yet; request will be ignored");
+ Ok(())
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
pub async fn change_broker_role( | |
&mut self, | |
_new_master_broker_id: Option<u64>, | |
_new_master_address: Option<CheetahString>, | |
_new_master_epoch: Option<i32>, | |
_sync_state_set_epoch: Option<i32>, | |
_sync_state_set: Option<HashSet<i64>>, | |
) -> rocketmq_error::RocketMQResult<()> { | |
Ok(()) | |
} | |
pub async fn change_broker_role( | |
&mut self, | |
_new_master_broker_id: Option<u64>, | |
_new_master_address: Option<CheetahString>, | |
_new_master_epoch: Option<i32>, | |
_sync_state_set_epoch: Option<i32>, | |
_sync_state_set: Option<HashSet<i64>>, | |
) -> rocketmq_error::RocketMQResult<()> { | |
warn!("ReplicasManager::change_broker_role is not implemented yet; request will be ignored"); | |
Ok(()) | |
} |
🤖 Prompt for AI Agents
In rocketmq-broker/src/controller/replicas_manager.rs around lines 38–47, the
change_broker_role function currently does nothing and returns Ok(()); update it
to avoid silently ignoring calls by (1) emitting a warning-level log entry
whenever the function is invoked (include the provided argument values in the
log for debugging), and (2) performing basic validation: require that either all
new-master fields (new_master_broker_id, new_master_address, new_master_epoch)
are Some together or all are None, and require that _sync_state_set_epoch and
_sync_state_set are consistent (both Some or both None) and, if present,
non-empty; if validation fails return an appropriate RocketMQ error instead of
Ok(()). Ensure the logging uses the existing logger/context and do not implement
full role-change logic—this is just to surface misuse until the real
implementation is added.
let request_header = | ||
request.decode_command_custom_header::<NotifyBrokerRoleChangedRequestHeader>(); | ||
|
||
let sync_state_set_info = | ||
SyncStateSet::decode(request.get_body().unwrap()).unwrap_or_default(); | ||
|
||
let response = RemotingCommand::create_response_command(); | ||
|
||
info!( | ||
"Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}", | ||
request_header.as_ref().expect("null") | ||
); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Header decode + logging can panic; handle errors without crashing.
request_header.as_ref().expect("null")
will panic on decode error before you branch on Ok
. Avoid panics in request handlers; return an error response and log.
-use tracing::info;
+use tracing::{info, error};
@@
- let request_header =
- request.decode_command_custom_header::<NotifyBrokerRoleChangedRequestHeader>();
+ let mut response = RemotingCommand::create_response_command();
+ let request_header = match request
+ .decode_command_custom_header::<NotifyBrokerRoleChangedRequestHeader>()
+ {
+ Ok(h) => h,
+ Err(e) => {
+ error!("Failed to decode NotifyBrokerRoleChangedRequestHeader: {}", e);
+ return Some(response.set_code(ResponseCode::SystemError));
+ }
+ };
@@
- let response = RemotingCommand::create_response_command();
-
- info!(
- "Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}",
- request_header.as_ref().expect("null")
- );
+ info!(
+ "Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}",
+ request_header
+ );
Committable suggestion skipped: line range outside the PR's diff.
SyncStateSet::decode(request.get_body().unwrap()).unwrap_or_default(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unchecked body access (unwrap
) can crash the broker.
Requests without a body will panic here. Decode defensively and default or reject.
- let sync_state_set_info =
- SyncStateSet::decode(request.get_body().unwrap()).unwrap_or_default();
+ let sync_state_set_info = match request.get_body() {
+ Some(body) => SyncStateSet::decode(body).unwrap_or_default(),
+ None => SyncStateSet::default(),
+ };
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
SyncStateSet::decode(request.get_body().unwrap()).unwrap_or_default(); | |
let sync_state_set_info = match request.get_body() { | |
Some(body) => SyncStateSet::decode(body).unwrap_or_default(), | |
None => SyncStateSet::default(), | |
}; |
🤖 Prompt for AI Agents
In
rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs
around lines 55-56, the code calls request.get_body().unwrap() which will panic
if the request has no body; change this to defensively handle the Option by
first checking request.get_body() and either using a default SyncStateSet (e.g.,
SyncStateSet::default()) or returning an appropriate error response/logging when
the body is missing, then pass the bytes into SyncStateSet::decode and handle
its Result via unwrap_or_default or proper error propagation; ensure you log the
issue and avoid unwrap so the broker does not crash on malformed/missing bodies.
let response = RemotingCommand::create_response_command(); | ||
|
||
info!( | ||
"Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}", | ||
request_header.as_ref().expect("null") | ||
); | ||
|
||
if let Some(replicas_mangesr) = self.broker_runtime_inner.replicas_manager_mut() { | ||
if let Ok(request_header) = request_header { | ||
let master_broker_id = request_header.master_broker_id; | ||
let master_address = request_header.master_address; | ||
let master_epoch = request_header.master_epoch; | ||
let sync_state_set_epoch = request_header.sync_state_set_epoch; | ||
let sync_state_set = sync_state_set_info.get_sync_state_set(); | ||
match replicas_mangesr | ||
.change_broker_role( | ||
master_broker_id, | ||
master_address, | ||
master_epoch, | ||
sync_state_set_epoch, | ||
sync_state_set, | ||
) | ||
.await | ||
{ | ||
Ok(_) => {} | ||
Err(e) => { | ||
panic!("Failed to call method change_broker_role: {}", e); | ||
} | ||
} | ||
} | ||
} | ||
|
||
Some(response.set_code(ResponseCode::Success)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Always returning Success hides failures; propagate error codes.
create_response_command()
already sets Success; you override to Success again regardless of failures. After adopting the above error paths, just return the response.
- Some(response.set_code(ResponseCode::Success))
+ Some(response)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let response = RemotingCommand::create_response_command(); | |
info!( | |
"Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}", | |
request_header.as_ref().expect("null") | |
); | |
if let Some(replicas_mangesr) = self.broker_runtime_inner.replicas_manager_mut() { | |
if let Ok(request_header) = request_header { | |
let master_broker_id = request_header.master_broker_id; | |
let master_address = request_header.master_address; | |
let master_epoch = request_header.master_epoch; | |
let sync_state_set_epoch = request_header.sync_state_set_epoch; | |
let sync_state_set = sync_state_set_info.get_sync_state_set(); | |
match replicas_mangesr | |
.change_broker_role( | |
master_broker_id, | |
master_address, | |
master_epoch, | |
sync_state_set_epoch, | |
sync_state_set, | |
) | |
.await | |
{ | |
Ok(_) => {} | |
Err(e) => { | |
panic!("Failed to call method change_broker_role: {}", e); | |
} | |
} | |
} | |
} | |
Some(response.set_code(ResponseCode::Success)) | |
} | |
let response = RemotingCommand::create_response_command(); | |
info!( | |
"Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}", | |
request_header.as_ref().expect("null") | |
); | |
if let Some(replicas_mangesr) = self.broker_runtime_inner.replicas_manager_mut() { | |
if let Ok(request_header) = request_header { | |
let master_broker_id = request_header.master_broker_id; | |
let master_address = request_header.master_address; | |
let master_epoch = request_header.master_epoch; | |
let sync_state_set_epoch = request_header.sync_state_set_epoch; | |
let sync_state_set = sync_state_set_info.get_sync_state_set(); | |
match replicas_mangesr | |
.change_broker_role( | |
master_broker_id, | |
master_address, | |
master_epoch, | |
sync_state_set_epoch, | |
sync_state_set, | |
) | |
.await | |
{ | |
Ok(_) => {} | |
Err(e) => { | |
panic!("Failed to call method change_broker_role: {}", e); | |
} | |
} | |
} | |
} | |
Some(response) | |
} |
🤖 Prompt for AI Agents
In
rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs
around lines 57 to 90, the handler always returns Success
(create_response_command already defaults to Success) and even panics on
change_broker_role errors; change this so you do not unconditionally override
Success—if change_broker_role returns Err, set the response code to an
appropriate failure ResponseCode and include the error message (e.g., via
response.set_remark or similar) instead of panicking; on success just return the
response as-is. Ensure you remove the unconditional
Some(response.set_code(ResponseCode::Success)) and instead return the response
after you set the proper code/remark per outcome.
if let Some(replicas_mangesr) = self.broker_runtime_inner.replicas_manager_mut() { | ||
if let Ok(request_header) = request_header { | ||
let master_broker_id = request_header.master_broker_id; | ||
let master_address = request_header.master_address; | ||
let master_epoch = request_header.master_epoch; | ||
let sync_state_set_epoch = request_header.sync_state_set_epoch; | ||
let sync_state_set = sync_state_set_info.get_sync_state_set(); | ||
match replicas_mangesr | ||
.change_broker_role( | ||
master_broker_id, | ||
master_address, | ||
master_epoch, | ||
sync_state_set_epoch, | ||
sync_state_set, | ||
) | ||
.await | ||
{ | ||
Ok(_) => {} | ||
Err(e) => { | ||
panic!("Failed to call method change_broker_role: {}", e); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don’t panic on replica update failures; return an error response and log. Also fix typo.
replicas_mangesr
typo.panic!
on.await
error will take down the handler thread/process.
- if let Some(replicas_mangesr) = self.broker_runtime_inner.replicas_manager_mut() {
- if let Ok(request_header) = request_header {
- let master_broker_id = request_header.master_broker_id;
- let master_address = request_header.master_address;
- let master_epoch = request_header.master_epoch;
- let sync_state_set_epoch = request_header.sync_state_set_epoch;
- let sync_state_set = sync_state_set_info.get_sync_state_set();
- match replicas_mangesr
- .change_broker_role(
- master_broker_id,
- master_address,
- master_epoch,
- sync_state_set_epoch,
- sync_state_set,
- )
- .await
- {
- Ok(_) => {}
- Err(e) => {
- panic!("Failed to call method change_broker_role: {}", e);
- }
- }
- }
- }
+ if let Some(replicas_manager) = self.broker_runtime_inner.replicas_manager_mut() {
+ let master_broker_id = request_header.master_broker_id;
+ let master_address = request_header.master_address;
+ let master_epoch = request_header.master_epoch;
+ let sync_state_set_epoch = request_header.sync_state_set_epoch;
+ // Prefer move-out to avoid cloning if you add `into_sync_state_set()`.
+ let sync_state_set = sync_state_set_info.get_sync_state_set();
+ if let Err(e) = replicas_manager
+ .change_broker_role(
+ master_broker_id,
+ master_address,
+ master_epoch,
+ sync_state_set_epoch,
+ sync_state_set,
+ )
+ .await
+ {
+ error!("Failed to change_broker_role: {}", e);
+ return Some(response.set_code(ResponseCode::SystemError));
+ }
+ } else {
+ error!("ReplicasManager not initialized; cannot apply role change");
+ return Some(response.set_code(ResponseCode::SystemError));
+ }
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In
rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs
around lines 64 to 86, fix the typo `replicas_mangesr` to `replicas_manager` and
remove the panic on change_broker_role failure; instead log the error (include
context and the error) and return an appropriate error response/result to the
caller (e.g., set the response code/remark or propagate a Err variant) so the
handler does not abort the thread/process.
let master_broker_id = request_header.master_broker_id; | ||
let master_address = request_header.master_address; | ||
let master_epoch = request_header.master_epoch; | ||
let sync_state_set_epoch = request_header.sync_state_set_epoch; | ||
let sync_state_set = sync_state_set_info.get_sync_state_set(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Confirm header field nullability and enforce required fields.
If NotifyBrokerRoleChangedRequestHeader
uses Option<...>
fields (as commonly done for headers), unwrap with validation and fail fast when missing. Otherwise, ignore this.
If the header fields are Option
, use this pattern:
+ // If header fields are Option<...>
+ let (Some(master_broker_id), Some(master_address), Some(master_epoch), Some(sync_state_set_epoch)) = (
+ request_header.master_broker_id,
+ request_header.master_address,
+ request_header.master_epoch,
+ request_header.sync_state_set_epoch,
+ ) else {
+ error!("Missing required header fields in NotifyBrokerRoleChangedRequestHeader");
+ return Some(response.set_code(ResponseCode::SystemError));
+ };
🏁 Script executed:
#!/bin/bash
# Inspect header and response codes to validate field types and usable error codes.
rg -n -C2 'struct\s+NotifyBrokerRoleChangedRequestHeader' rocketmq-remoting/src/protocol/header
rg -n -C2 'pub\s+enum\s+ResponseCode' rocketmq-remoting/src
Length of output: 972
Validate and unwrap required header Option fields before use.
NotifyBrokerRoleChangedRequestHeader fields are Option (master_address is annotated #[required]). Unwrap/validate required/used fields and fail fast (return ResponseCode::SystemError) if any are missing.
Location: rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs (~lines 66–70)
Suggested fix:
+ let (Some(master_broker_id), Some(master_address), Some(master_epoch), Some(sync_state_set_epoch)) = (
+ request_header.master_broker_id,
+ request_header.master_address,
+ request_header.master_epoch,
+ request_header.sync_state_set_epoch,
+ ) else {
+ error!("Missing required header fields in NotifyBrokerRoleChangedRequestHeader");
+ return Some(response.set_code(ResponseCode::SystemError));
+ };
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let master_broker_id = request_header.master_broker_id; | |
let master_address = request_header.master_address; | |
let master_epoch = request_header.master_epoch; | |
let sync_state_set_epoch = request_header.sync_state_set_epoch; | |
let sync_state_set = sync_state_set_info.get_sync_state_set(); | |
let (Some(master_broker_id), Some(master_address), Some(master_epoch), Some(sync_state_set_epoch)) = ( | |
request_header.master_broker_id, | |
request_header.master_address, | |
request_header.master_epoch, | |
request_header.sync_state_set_epoch, | |
) else { | |
error!("Missing required header fields in NotifyBrokerRoleChangedRequestHeader"); | |
return Some(response.set_code(ResponseCode::SystemError)); | |
}; | |
let sync_state_set = sync_state_set_info.get_sync_state_set(); |
🔊@watchgou 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (5)
rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs (5)
51-53
: Header decode can panic; fail fast and return an error response.Avoid panicking in request path; handle decode Result and log.
- let request_header = - request.decode_command_custom_header::<NotifyBrokerRoleChangedRequestHeader>(); + let response = RemotingCommand::create_response_command(); + let request_header = match request + .decode_command_custom_header::<NotifyBrokerRoleChangedRequestHeader>() + { + Ok(h) => h, + Err(e) => { + error!("Failed to decode NotifyBrokerRoleChangedRequestHeader: {}", e); + return Some(response.set_code(ResponseCode::SystemError)); + } + };
54-56
: Unchecked body unwrap can crash the broker; decode defensively.Requests may have no body; don’t unwrap().
- let sync_state_set_info = - SyncStateSet::decode(request.get_body().unwrap()).unwrap_or_default(); + let sync_state_set_info = match request.get_body() { + Some(body) => SyncStateSet::decode(body).unwrap_or_default(), + None => SyncStateSet::default(), + };
57-63
: Logging currently uses expect on a Result; log the decoded header instead.After handling decode above, remove the panic‑prone expect.
- let response = RemotingCommand::create_response_command(); - info!( "Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}", - request_header.as_ref().expect("null") + request_header );
64-82
: Don’t panic on role-change failures; handle missing ReplicasManager; fix variable name.
- Rename replicas_mangesr → replicas_manager.
- If manager is None, return SystemError.
- On .await error, log and return error instead of panic!.
- if let Some(replicas_mangesr) = self.broker_runtime_inner.replicas_manager_mut() { - if let Ok(request_header) = request_header { - match replicas_mangesr - .change_broker_role( - request_header.master_broker_id, - request_header.master_address, - request_header.master_epoch, - request_header.sync_state_set_epoch, - sync_state_set_info.get_sync_state_set(), - ) - .await - { - Ok(_) => {} - Err(e) => { - panic!("Failed to call method change_broker_role: {}", e); - } - } - } - } + if let Some(replicas_manager) = self.broker_runtime_inner.replicas_manager_mut() { + if let Err(e) = replicas_manager + .change_broker_role( + request_header.master_broker_id, + request_header.master_address, + request_header.master_epoch, + request_header.sync_state_set_epoch, + sync_state_set_info.get_sync_state_set(), + ) + .await + { + error!("Failed to change_broker_role: {}", e); + return Some(response.set_code(ResponseCode::SystemError)); + } + } else { + error!("ReplicasManager not initialized; cannot apply role change"); + return Some(response.set_code(ResponseCode::SystemError)); + }
84-85
: Don’t force Success; let earlier paths set errors.create_response_command() is already Success; just return response.
- Some(response.set_code(ResponseCode::Success)) + Some(response)
🧹 Nitpick comments (1)
rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs (1)
28-28
: Import error-level logging for proper error handling.You use error! in the fixes below; bring it into scope.
-use tracing::info; +use tracing::{info, error};
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
rocketmq-broker/src/controller/replicas_manager.rs
(2 hunks)rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs
(1 hunks)rocketmq-remoting/src/protocol/body/sync_state_set_body.rs
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- rocketmq-remoting/src/protocol/body/sync_state_set_body.rs
- rocketmq-broker/src/controller/replicas_manager.rs
🧰 Additional context used
🧬 Code graph analysis (1)
rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs (3)
rocketmq-remoting/src/protocol/remoting_command.rs (2)
code
(578-580)create_response_command
(206-210)rocketmq-broker/src/broker_runtime.rs (2)
message_store
(1932-1934)new
(151-282)rocketmq-broker/src/processor/admin_broker_processor.rs (1)
new
(88-125)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: build
- GitHub Check: test
- GitHub Check: build (windows-latest, nightly)
- GitHub Check: build (macos-latest, nightly)
- GitHub Check: build (macos-latest, stable)
- GitHub Check: build (ubuntu-latest, nightly)
- GitHub Check: build (windows-latest, stable)
- GitHub Check: build (ubuntu-latest, stable)
- GitHub Check: auto-approve
request_header.master_broker_id, | ||
request_header.master_address, | ||
request_header.master_epoch, | ||
request_header.sync_state_set_epoch, | ||
sync_state_set_info.get_sync_state_set(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
🧩 Analysis chain
Validate required header fields before applying changes.
If these header fields are Option, enforce presence for required ones (e.g., master_address, master_epoch, sync_state_set_epoch) and return a client/error code when missing.
If master_address/master_epoch/sync_state_set_epoch are optional in the type but required semantically, add a check after decoding and before invoking change_broker_role.
🏁 Script executed:
#!/bin/bash
# Inspect header field types/attributes to confirm which are Option and #[required]
rg -n -C2 'struct\s+NotifyBrokerRoleChangedRequestHeader' rocketmq-remoting/src/protocol/header
rg -n -C2 '#\[required\]' rocketmq-remoting/src/protocol/header
Length of output: 61219
Validate required header fields before applying changes.
NotifyBrokerRoleChangedRequestHeader marks master_address, master_epoch, sync_state_set_epoch, master_broker_id as #[required] but their types are Option; enforce presence (match/ok_or) and return an appropriate client/error code when any are missing before calling change_broker_role. Fix in rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs (around lines 68–72).
🤖 Prompt for AI Agents
In
rocketmq-broker/src/processor/admin_broker_processor/notify_broker_role_change_handler.rs
around lines 68–72, the handler currently passes Option<T> fields
(master_address, master_epoch, sync_state_set_epoch, master_broker_id) directly
into change_broker_role; validate each required header field by matching or
using ok_or to convert missing Options into an early error/response (returning
the appropriate client/error code) before calling change_broker_role.
Specifically, extract master_broker_id, master_address, master_epoch, and
sync_state_set_epoch from the request_header, return a client error if any is
None (with a clear error code/message), and only then call change_broker_role
with the unwrapped values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Which Issue(s) This PR Fixes(Closes)
Fixes #4059
Brief Description
How Did You Test This Change?
Summary by CodeRabbit