-
Notifications
You must be signed in to change notification settings - Fork 174
Broker id change #4035
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Broker id change #4035
Conversation
WalkthroughAdds HA info exchange support: new handler wired into admin processor, new response header type, and runtime accessors. Implements in-group min-broker tracking with async updates and lifecycle hooks. Exposes protocol module for the new response header. Integrates request handling for ExchangeBrokerHaInfo. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Client
participant AdminBrokerProcessor as AdminBrokerProcessor
participant UpdateBrokerHaHandler as UpdateBrokerHaHandler
participant BrokerRuntime as BrokerRuntimeInner
participant MessageStore as MessageStore
Client->>AdminBrokerProcessor: RemotingCommand(RequestCode::ExchangeBrokerHaInfo)
AdminBrokerProcessor->>UpdateBrokerHaHandler: update_broker_ha_info(...)
UpdateBrokerHaHandler->>UpdateBrokerHaHandler: Decode ExchangeHAInfoRequestHeader
alt header.masterHaAddress present and non-empty
UpdateBrokerHaHandler->>BrokerRuntime: set HA master address
opt message_store available
UpdateBrokerHaHandler->>MessageStore: update master address
alt sync-on-startup enabled and current masterFlushOffset == 0
UpdateBrokerHaHandler->>MessageStore: set master flushed offset
end
end
UpdateBrokerHaHandler-->>AdminBrokerProcessor: Success response
else header.masterHaAddress present but empty and broker is MASTER
UpdateBrokerHaHandler->>BrokerRuntime: read HA server addr and offsets
UpdateBrokerHaHandler-->>AdminBrokerProcessor: Success + ExchangeHaInfoResponseHeader
else
UpdateBrokerHaHandler-->>AdminBrokerProcessor: Success response
end
AdminBrokerProcessor-->>Client: RemotingCommand(Response)
sequenceDiagram
autonumber
participant Controller as NotifyMinBrokerChangeIdHandler
participant Lock as RWLock<MinBrokerIngroup>
participant Runtime as BrokerRuntimeInner
participant Store as MessageStore
Controller->>Controller: update_min_broker(header)
alt enabled && broker_id != MASTER_ID
Controller->>Lock: try_read/write (timeout 3s)
alt minBrokerId changed
Controller->>Controller: on_min_broker_change(...)
Controller->>Lock: update state
Controller->>Controller: change_special_service_status(should_start)
Controller->>Runtime: start/stop special services
alt offline_broker_addr == known master
Controller->>Controller: on_master_offline()
Controller->>Runtime: clear master addresses
Controller->>Store: clear master addresses
end
alt min_broker_id == MASTER_ID or min_broker_addr non-empty
Controller->>Controller: on_master_on_line(addr, ha_addr)
note right of Controller: May sync master flush offset (placeholders for unimplemented branches)
end
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Pre-merge checks (2 passed, 1 inconclusive)❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
Poem
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal). Please share your feedback with us on this Discord post. ✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
🔊@watchgou 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs (1)
75-84
: Avoid panics on header decode and fix error messageunwrap() will crash the broker on malformed requests. Handle decode failures and avoid expect(...) on optional fields.
- let change_header = request - .decode_command_custom_header::<NotifyMinBrokerIdChangeRequestHeader>() - .unwrap(); - - let latest_broker_id = change_header - .min_broker_id - .expect("min broker id not must be present"); + let change_header = match request + .decode_command_custom_header::<NotifyMinBrokerIdChangeRequestHeader>() + { + Ok(h) => h, + Err(e) => { + error!("Failed to decode NotifyMinBrokerIdChangeRequestHeader: {}", e); + let mut response = RemotingCommand::default(); + response.set_code_ref(ResponseCode::SystemError); + response.set_remark(format!("Invalid request header: {}", e)); + return Some(response); + } + };
🧹 Nitpick comments (6)
rocketmq-broker/src/broker_runtime.rs (1)
2060-2070
: Expose a more ergonomic API and avoid silent no-op.
- Returning a status helps callers know whether an update occurred.
- Accepting Into avoids an unnecessary allocation at call sites.
@@ #[inline] - pub fn slave_synchronize(&self) -> &Option<SlaveSynchronize<MS>> { - &self.slave_synchronize + pub fn slave_synchronize(&self) -> &Option<SlaveSynchronize<MS>> { + &self.slave_synchronize } #[inline] - pub fn update_slave_master_addr(&mut self, master_addr: CheetahString) { - if let Some(ref mut slave) = self.slave_synchronize { - slave.set_master_addr(master_addr); - }; - } + pub fn update_slave_master_addr<I: Into<CheetahString>>(&mut self, master_addr: I) -> bool { + if let Some(ref mut slave) = self.slave_synchronize { + slave.set_master_addr(master_addr.into()); + true + } else { + false + } + }rocketmq-remoting/src/protocol/header/exchange_ha_info_response_header.rs (2)
21-27
: Keep acronym casing consistent with the request header.Request uses ExchangeHAInfoRequestHeader while response is ExchangeHaInfoResponseHeader. Consider renaming to ExchangeHAInfoResponseHeader for consistency across the API. Low impact now while adoption is limited.
Would you like me to prepare a follow-up rename patch across the crate?
24-26
: Skip None fields during serialization.Keeps payloads compact and avoids emitting nulls in JSON-based tests/logs.
pub struct ExchangeHaInfoResponseHeader { - pub master_ha_address: Option<CheetahString>, - pub master_flush_offset: Option<i64>, - pub master_address: Option<CheetahString>, + #[serde(skip_serializing_if = "Option::is_none")] + pub master_ha_address: Option<CheetahString>, + #[serde(skip_serializing_if = "Option::is_none")] + pub master_flush_offset: Option<i64>, + #[serde(skip_serializing_if = "Option::is_none")] + pub master_address: Option<CheetahString>, }rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs (3)
165-168
: Tighten "master online" conditionCurrent condition fires if either id==MASTER_ID or addr is non-empty. Consider requiring both to reduce false positives.
- if min_broker_id == MASTER_ID || !min_broker_addr.is_empty() { + if min_broker_id == MASTER_ID && !min_broker_addr.is_empty() {
51-57
: Confirm MASTER_ID type vs u64min_broker_id_in_group is Option but MASTER_ID’s type may differ. If not u64, cast explicitly.
- min_broker_id_in_group: Some(MASTER_ID), + min_broker_id_in_group: Some(MASTER_ID as u64),
223-229
: API surface: avoid exposing Arc in getterReturning Arc leaks internal synchronization details. Prefer returning CheetahString (clone) or &CheetahString with a guard pattern.
Want me to sketch a small accessor wrapper that avoids cloning on the hot path?
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
rocketmq-broker/src/broker_runtime.rs
(1 hunks)rocketmq-broker/src/processor/admin_broker_processor.rs
(6 hunks)rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs
(2 hunks)rocketmq-broker/src/processor/admin_broker_processor/update_broker_ha_handler.rs
(1 hunks)rocketmq-remoting/src/protocol/header.rs
(1 hunks)rocketmq-remoting/src/protocol/header/exchange_ha_info_response_header.rs
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
rocketmq-broker/src/processor/admin_broker_processor/update_broker_ha_handler.rs (2)
rocketmq-broker/src/broker_runtime.rs (1)
message_store
(1908-1910)rocketmq-broker/src/processor/admin_broker_processor.rs (1)
new
(79-105)
rocketmq-broker/src/broker_runtime.rs (1)
rocketmq-broker/src/slave/slave_synchronize.rs (1)
master_addr
(46-48)
rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs (2)
rocketmq-broker/src/broker_runtime.rs (8)
message_store
(1908-1910)broker_config
(271-273)broker_config
(1838-1840)on_min_broker_change
(2562-2570)get_min_broker_id_in_group
(2237-2239)get_broker_addr
(2443-2445)change_special_service_status
(2497-2509)slave_synchronize
(2061-2063)rocketmq-broker/src/slave/slave_synchronize.rs (1)
master_addr
(46-48)
rocketmq-remoting/src/protocol/header/exchange_ha_info_response_header.rs (1)
rocketmq-common/src/utils/parse_config_file.rs (1)
cfg
(36-36)
rocketmq-broker/src/processor/admin_broker_processor.rs (6)
rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs (3)
new
(52-57)new
(61-66)request
(75-76)rocketmq-broker/src/broker_runtime.rs (1)
new
(140-269)rocketmq-broker/src/processor/admin_broker_processor/offset_request_handler.rs (3)
new
(44-48)request
(59-60)request
(94-95)rocketmq-broker/src/processor/admin_broker_processor/batch_mq_handler.rs (1)
new
(46-50)rocketmq-broker/src/processor/admin_broker_processor/consumer_request_handler.rs (3)
new
(46-50)request
(62-63)request
(115-116)rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs (7)
new
(62-66)request
(78-79)request
(314-315)request
(448-449)request
(527-528)request
(574-575)request
(600-601)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: auto-approve
🔇 Additional comments (6)
rocketmq-remoting/src/protocol/header.rs (1)
32-32
: LGTM: response header export added.rocketmq-broker/src/processor/admin_broker_processor.rs (2)
36-46
: Wiring for HA handler looks correct.Module import, construction, and field storage are consistent with other handlers.
Also applies to: 58-59, 92-93, 103-104
235-239
: Routing ExchangeBrokerHaInfo to the new handler is fine; ensure error codes propagate.If the handler needs to return non-Success responses, have it set appropriate ResponseCode and remarks; the match arm already forwards its result.
Do we have integration tests covering a malformed ExchangeHAInfoRequestHeader path?
rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs (3)
171-176
: Service status propagation looks goodAsync delegation to BrokerRuntimeInner::change_special_service_status is appropriate.
100-102
: Scope of updates when acting as masterYou skip updates unless enable_slave_acting_master && broker_id != MASTER_ID. Confirm masters don’t need to react to NotifyMinBrokerIdChange.
110-111
: Compare against tracked in-group min, not local broker_idRead the tracked cluster min from the same lock and compare the incoming min_broker_id to that — do not call self.broker_runtime_inner.get_min_broker_id_in_group() here.
File: rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs (lines 110–111).
if self | ||
.lock | ||
.try_write_timeout(Duration::from_millis(3000)) | ||
.await | ||
.is_some() | ||
{ | ||
if let Some(min_broker_id) = change_header.min_broker_id { | ||
if min_broker_id != self.broker_runtime_inner.get_min_broker_id_in_group() { | ||
// on min broker change | ||
let min_broker_addr = change_header.min_broker_addr.as_deref().unwrap(); | ||
|
||
self.on_min_broker_change( | ||
min_broker_id, | ||
min_broker_addr, | ||
&change_header.offline_broker_addr, | ||
&change_header.ha_broker_addr, | ||
) | ||
.await; | ||
} | ||
} | ||
} else { | ||
error!("Update min broker failed"); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Fix lock misuse: ignoring write-guard and read-under-write causes race/deadlock
- The result of try_write_timeout(...) is discarded; you immediately reacquire a fresh write-lock later. This is TOCTOU and defeats the timeout.
- In on_min_broker_change you hold a write lock and then await a read lock from the same RwLock (Lines 148-150) — this can deadlock.
Refactor to (a) compare against the tracked in-group value under a read lock, (b) avoid the unused try_write_timeout, and (c) compute should_start without re-locking:
@@
- if self
- .lock
- .try_write_timeout(Duration::from_millis(3000))
- .await
- .is_some()
- {
- if let Some(min_broker_id) = change_header.min_broker_id {
- if min_broker_id != self.broker_runtime_inner.get_min_broker_id_in_group() {
- // on min broker change
- let min_broker_addr = change_header.min_broker_addr.as_deref().unwrap();
-
- self.on_min_broker_change(
- min_broker_id,
- min_broker_addr,
- &change_header.offline_broker_addr,
- &change_header.ha_broker_addr,
- )
- .await;
- }
- }
- } else {
- error!("Update min broker failed");
- }
+ if let Some(min_broker_id) = change_header.min_broker_id {
+ let current = self.lock.read().await.min_broker_id_in_group;
+ if current != Some(min_broker_id) {
+ let min_broker_addr = change_header
+ .min_broker_addr
+ .as_deref()
+ .unwrap_or_default();
+ self.on_min_broker_change(
+ min_broker_id,
+ min_broker_addr,
+ &change_header.offline_broker_addr,
+ &change_header.ha_broker_addr,
+ )
+ .await;
+ }
+ } else {
+ warn!("min_broker_id missing; skip update_min_broker");
+ }
@@
- info!(
- "Min broker changed, old: {}-{}, new {}-{}",
- self.broker_runtime_inner.get_min_broker_id_in_group(),
- self.broker_runtime_inner.get_broker_addr(),
- min_broker_id,
- min_broker_addr
- );
-
- let mut lock_guard = self.lock.write().await;
- lock_guard.min_broker_id_in_group = Some(min_broker_id);
- lock_guard.min_broker_addr_in_group = Arc::new(CheetahString::from_slice(min_broker_addr));
-
- let should_start = self.broker_runtime_inner.get_min_broker_id_in_group()
- == self.lock.read().await.min_broker_id_in_group.unwrap();
+ let mut guard = self.lock.write().await;
+ let prev_id = guard.min_broker_id_in_group;
+ let prev_addr = guard.min_broker_addr_in_group.clone();
+ info!(
+ "Min broker changed, old: {:?}-{}, new {}-{}",
+ prev_id, prev_addr, min_broker_id, min_broker_addr
+ );
+ guard.min_broker_id_in_group = Some(min_broker_id);
+ guard.min_broker_addr_in_group = Arc::new(CheetahString::from_slice(min_broker_addr));
+ drop(guard);
+
+ let should_start =
+ self.broker_runtime_inner.get_min_broker_id_in_group() == min_broker_id;
Also applies to: 136-151
let min_broker_addr = change_header.min_broker_addr.as_deref().unwrap(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Don't unwrap min_broker_addr
min_broker_addr may be absent; unwrap() panics. Use a safe default or branch.
Covered in the larger diff above by using as_deref().unwrap_or_default().
🤖 Prompt for AI Agents
In
rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs
around lines 112 to 113, the code calls
change_header.min_broker_addr.as_deref().unwrap() which can panic if
min_broker_addr is None; replace this unwrap with a safe fallback or branch —
for example use as_deref().unwrap_or_default() to get an empty string when
absent or explicitly match/if-let on min_broker_addr to handle the None case and
proceed accordingly, ensuring no panics.
let need_sync_master_flush_offset = | ||
if let Some(message_store) = self.broker_runtime_inner.message_store() { | ||
message_store.get_master_flushed_offset() == 0x0000 | ||
&& self | ||
.broker_runtime_inner | ||
.message_store_config() | ||
.sync_master_flush_offset_when_startup | ||
} else { | ||
false | ||
}; | ||
|
||
if master_ha_addr.is_none() || need_sync_master_flush_offset { | ||
if need_sync_master_flush_offset { | ||
unimplemented!(); | ||
} | ||
|
||
if master_ha_addr.is_none() { | ||
let broker_runtime_inner = self.broker_runtime_inner.mut_from_ref(); | ||
if let Some(_message_store) = broker_runtime_inner.message_store() { | ||
unimplemented!(""); | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Replace unimplemented!() with non-fatal behavior
unimplemented!() will panic the broker at runtime. Log and return until the implementation is ready.
- if master_ha_addr.is_none() || need_sync_master_flush_offset {
- if need_sync_master_flush_offset {
- unimplemented!();
- }
-
- if master_ha_addr.is_none() {
- let broker_runtime_inner = self.broker_runtime_inner.mut_from_ref();
- if let Some(_message_store) = broker_runtime_inner.message_store() {
- unimplemented!("");
- }
- }
- }
+ if master_ha_addr.is_none() || need_sync_master_flush_offset {
+ if need_sync_master_flush_offset {
+ warn!("Syncing master flush offset is not implemented yet");
+ }
+ if master_ha_addr.is_none() {
+ let broker_runtime_inner = self.broker_runtime_inner.mut_from_ref();
+ if let Some(_message_store) = broker_runtime_inner.message_store() {
+ warn!("Updating message store with master HA addr is not implemented yet");
+ }
+ }
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let need_sync_master_flush_offset = | |
if let Some(message_store) = self.broker_runtime_inner.message_store() { | |
message_store.get_master_flushed_offset() == 0x0000 | |
&& self | |
.broker_runtime_inner | |
.message_store_config() | |
.sync_master_flush_offset_when_startup | |
} else { | |
false | |
}; | |
if master_ha_addr.is_none() || need_sync_master_flush_offset { | |
if need_sync_master_flush_offset { | |
unimplemented!(); | |
} | |
if master_ha_addr.is_none() { | |
let broker_runtime_inner = self.broker_runtime_inner.mut_from_ref(); | |
if let Some(_message_store) = broker_runtime_inner.message_store() { | |
unimplemented!(""); | |
} | |
} | |
} | |
} | |
let need_sync_master_flush_offset = | |
if let Some(message_store) = self.broker_runtime_inner.message_store() { | |
message_store.get_master_flushed_offset() == 0x0000 | |
&& self | |
.broker_runtime_inner | |
.message_store_config() | |
.sync_master_flush_offset_when_startup | |
} else { | |
false | |
}; | |
if master_ha_addr.is_none() || need_sync_master_flush_offset { | |
if need_sync_master_flush_offset { | |
warn!("Syncing master flush offset is not implemented yet"); | |
} | |
if master_ha_addr.is_none() { | |
let broker_runtime_inner = self.broker_runtime_inner.mut_from_ref(); | |
if let Some(_message_store) = broker_runtime_inner.message_store() { | |
warn!("Updating message store with master HA addr is not implemented yet"); | |
} | |
} | |
} | |
} |
🤖 Prompt for AI Agents
In
rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs
around lines 198 to 221, replace the unimplemented!() calls with non-fatal
behavior: log a warning or info including context (e.g., why master flush sync
or message_store branch is not implemented) and then return early from the
handler (or otherwise exit this code path) instead of panicking; ensure you use
the existing logger instance available on self or broker_runtime_inner, include
relevant variables in the message (like need_sync_master_flush_offset or whether
message_store is present), and avoid changing control flow beyond
returning/short-circuiting so the broker continues running.
let exchange_request_header = request | ||
.decode_command_custom_header::<ExchangeHAInfoRequestHeader>() | ||
.unwrap(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don’t unwrap on header decode; return a proper error response.
Unwrap will panic on malformed/missing headers. Decode errors should map to a non-Success ResponseCode.
- let exchange_request_header = request
- .decode_command_custom_header::<ExchangeHAInfoRequestHeader>()
- .unwrap();
+ let exchange_request_header = match request
+ .decode_command_custom_header::<ExchangeHAInfoRequestHeader>()
+ {
+ Ok(h) => h,
+ Err(e) => {
+ let mut resp = RemotingCommand::default();
+ resp.set_code_ref(ResponseCode::SystemError);
+ resp.set_remark(format!("Failed to decode ExchangeHAInfoRequestHeader: {e}"));
+ return Some(resp);
+ }
+ };
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let exchange_request_header = request | |
.decode_command_custom_header::<ExchangeHAInfoRequestHeader>() | |
.unwrap(); | |
let exchange_request_header = match request | |
.decode_command_custom_header::<ExchangeHAInfoRequestHeader>() | |
{ | |
Ok(h) => h, | |
Err(e) => { | |
let mut resp = RemotingCommand::default(); | |
resp.set_code_ref(ResponseCode::SystemError); | |
resp.set_remark(format!("Failed to decode ExchangeHAInfoRequestHeader: {e}")); | |
return Some(resp); | |
} | |
}; |
🤖 Prompt for AI Agents
In
rocketmq-broker/src/processor/admin_broker_processor/update_broker_ha_handler.rs
around lines 50-53, replace the unwrap on
request.decode_command_custom_header::<ExchangeHAInfoRequestHeader>() with
proper error handling: attempt to decode the header and if it fails or is
missing, construct and return a RemotingCommand response with a non-success
ResponseCode (e.g., REQUEST_HEADER_NOT_WELL_FORMED or an equivalent bad-request
code), set a<codegen_instructions>
In
rocketmq-broker/src/processor/admin_broker_processor/update_broker_ha_handler.rs
around lines 50-53, replace the unwrap on
request.decode_command_custom_header::<ExchangeHAInfoRequestHeader>() with
proper error handling: attempt to decode the header and if it fails or is
missing, construct and return a RemotingCommand response with a non-success
ResponseCode (e.g., REQUEST_HEADER_NOT_WELL_FORMED or an equivalent bad-request
code), set the response remark to include the decode error or "missing header",
and return early instead of panicking; otherwise continue with the decoded
header.
if let Some(master_ha_addr) = exchange_request_header.master_ha_address { | ||
if !master_ha_addr.is_empty() { | ||
if let Some(message_store) = self.broker_runtime_inner.message_store() { | ||
message_store | ||
.update_ha_master_address(master_ha_addr.as_str()) | ||
.await; | ||
|
||
let master_address = exchange_request_header.master_address.unwrap_or_default(); | ||
message_store.update_master_address(&master_address); | ||
|
||
let is_sync_master_flush_offset_when_startip = self | ||
.broker_runtime_inner | ||
.message_store_config() | ||
.sync_master_flush_offset_when_startup; | ||
if message_store.get_master_flushed_offset() == 0x000 | ||
&& is_sync_master_flush_offset_when_startip | ||
{ | ||
let master_flush_offset = | ||
exchange_request_header.master_flush_offset.unwrap(); | ||
|
||
info!( | ||
"Set master flush offset in slave to {}", | ||
master_flush_offset | ||
); | ||
message_store.set_master_flushed_offset(master_flush_offset); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix partial-move bug, avoid unwraps, and update SlaveSynchronize too.
- Using
if let Some(master_ha_addr) = exchange_request_header.master_ha_address
moves the field, preventing subsequent access to other fields—this won’t compile. Borrow withas_ref()
or pattern-match on references. - Avoid
unwrap()
onmaster_flush_offset
. - Keep SlaveSynchronize’s master address in sync via
update_slave_master_addr
.
- if let Some(master_ha_addr) = exchange_request_header.master_ha_address {
- if !master_ha_addr.is_empty() {
+ if let Some(master_ha_addr) = exchange_request_header.master_ha_address.as_ref() {
+ if !master_ha_addr.is_empty() {
if let Some(message_store) = self.broker_runtime_inner.message_store() {
message_store
.update_ha_master_address(master_ha_addr.as_str())
.await;
-
- let master_address = exchange_request_header.master_address.unwrap_or_default();
- message_store.update_master_address(&master_address);
-
- let is_sync_master_flush_offset_when_startip = self
+ let master_address =
+ exchange_request_header.master_address.clone().unwrap_or_default();
+ message_store.update_master_address(&master_address);
+ // keep SlaveSynchronize in sync
+ let _ = self
+ .broker_runtime_inner
+ .update_slave_master_addr(master_address);
+
+ let sync_master_flush_on_startup = self
.broker_runtime_inner
.message_store_config()
.sync_master_flush_offset_when_startup;
- if message_store.get_master_flushed_offset() == 0x000
- && is_sync_master_flush_offset_when_startip
+ if message_store.get_master_flushed_offset() == 0
+ && sync_master_flush_on_startup
{
- let master_flush_offset =
- exchange_request_header.master_flush_offset.unwrap();
-
- info!(
- "Set master flush offset in slave to {}",
- master_flush_offset
- );
- message_store.set_master_flushed_offset(master_flush_offset);
+ if let Some(master_flush_offset) = exchange_request_header.master_flush_offset
+ {
+ info!("Set master flush offset in slave to {}", master_flush_offset);
+ message_store.set_master_flushed_offset(master_flush_offset);
+ }
}
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if let Some(master_ha_addr) = exchange_request_header.master_ha_address { | |
if !master_ha_addr.is_empty() { | |
if let Some(message_store) = self.broker_runtime_inner.message_store() { | |
message_store | |
.update_ha_master_address(master_ha_addr.as_str()) | |
.await; | |
let master_address = exchange_request_header.master_address.unwrap_or_default(); | |
message_store.update_master_address(&master_address); | |
let is_sync_master_flush_offset_when_startip = self | |
.broker_runtime_inner | |
.message_store_config() | |
.sync_master_flush_offset_when_startup; | |
if message_store.get_master_flushed_offset() == 0x000 | |
&& is_sync_master_flush_offset_when_startip | |
{ | |
let master_flush_offset = | |
exchange_request_header.master_flush_offset.unwrap(); | |
info!( | |
"Set master flush offset in slave to {}", | |
master_flush_offset | |
); | |
message_store.set_master_flushed_offset(master_flush_offset); | |
} | |
} | |
if let Some(master_ha_addr) = exchange_request_header.master_ha_address.as_ref() { | |
if !master_ha_addr.is_empty() { | |
if let Some(message_store) = self.broker_runtime_inner.message_store() { | |
message_store | |
.update_ha_master_address(master_ha_addr.as_str()) | |
.await; | |
let master_address = | |
exchange_request_header.master_address.clone().unwrap_or_default(); | |
message_store.update_master_address(&master_address); | |
// keep SlaveSynchronize in sync | |
let _ = self | |
.broker_runtime_inner | |
.update_slave_master_addr(master_address); | |
let sync_master_flush_on_startup = self | |
.broker_runtime_inner | |
.message_store_config() | |
.sync_master_flush_offset_when_startup; | |
if message_store.get_master_flushed_offset() == 0 | |
&& sync_master_flush_on_startup | |
{ | |
if let Some(master_flush_offset) = exchange_request_header.master_flush_offset | |
{ | |
info!("Set master flush offset in slave to {}", master_flush_offset); | |
message_store.set_master_flushed_offset(master_flush_offset); | |
} | |
} | |
} |
let response_header = response | ||
.read_custom_header_mut::<ExchangeHaInfoResponseHeader>() | ||
.unwrap(); | ||
|
||
response_header.master_ha_address = | ||
Some(self.broker_runtime_inner.get_ha_server_addr()); | ||
|
||
if let Some(message_store) = self.broker_runtime_inner.message_store() { | ||
response_header.master_flush_offset = | ||
Some(message_store.get_broker_init_max_offset()); | ||
} | ||
response_header.master_address = | ||
Some(self.broker_runtime_inner.get_broker_addr().clone()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Response header construction likely incorrect; avoid read_custom_header_mut on a fresh response.
Calling read_custom_header_mut()
on a default response is prone to Err/unwrap panic. Prefer the established pattern used elsewhere (e.g., build a header instance and attach it to the response via the RemotingCommand API).
Please confirm the canonical way to attach a response header in this codebase and update accordingly. Script to locate existing patterns:
Proposed shape (adjust to the actual API):
- let response_header = response
- .read_custom_header_mut::<ExchangeHaInfoResponseHeader>()
- .unwrap();
-
- response_header.master_ha_address =
- Some(self.broker_runtime_inner.get_ha_server_addr());
+ let mut header = ExchangeHaInfoResponseHeader {
+ master_ha_address: Some(self.broker_runtime_inner.get_ha_server_addr()),
+ master_flush_offset: None,
+ master_address: Some(self.broker_runtime_inner.get_broker_addr().clone()),
+ };
- if let Some(message_store) = self.broker_runtime_inner.message_store() {
- response_header.master_flush_offset =
- Some(message_store.get_broker_init_max_offset());
- }
- response_header.master_address =
- Some(self.broker_runtime_inner.get_broker_addr().clone());
+ if let Some(message_store) = self.broker_runtime_inner.message_store() {
+ header.master_flush_offset = Some(message_store.get_broker_init_max_offset());
+ }
+ // Attach header to response (replace with the correct API)
+ // response.write_custom_header(&header);
🏁 Script executed:
#!/bin/bash
# Find how response headers are attached to RemotingCommand across the repo
rg -nP -C3 --type=rust '\b(RemotingCommand::create_response_command|write_custom_header|set_custom_header|make_custom_header|read_custom_header_mut<)'
Length of output: 125690
Don't call read_custom_header_mut() on a fresh RemotingCommand — attach a header instead.
read_custom_header_mut() returns None for a newly-created RemotingCommand (unwrap will panic). Construct an ExchangeHaInfoResponseHeader, set master_ha_address / master_flush_offset / master_address, then attach it with RemotingCommand::create_response_command_with_header(header) or response.set_command_custom_header(header).
Location: rocketmq-broker/src/processor/admin_broker_processor/update_broker_ha_handler.rs (≈ lines 90–103).
🤖 Prompt for AI Agents
In
rocketmq-broker/src/processor/admin_broker_processor/update_broker_ha_handler.rs
around lines 90 to 103, the code calls
response.read_custom_header_mut::<ExchangeHaInfoResponseHeader>().unwrap() on a
fresh RemotingCommand which returns None and will panic; instead instantiate an
ExchangeHaInfoResponseHeader, populate master_ha_address, master_flush_offset
(if message_store exists), and master_address from broker_runtime_inner, then
attach it to the response using
RemotingCommand::create_response_command_with_header(header) or
response.set_command_custom_header(header) (do not call read_custom_header_mut
or unwrap).
Which Issue(s) This PR Fixes(Closes)
Fixes #issue_id
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
Tests