-
Notifications
You must be signed in to change notification settings - Fork 174
[ISSUE #4093]🚀Implement broker HA info retrieval and update logic in BrokerRuntime #4094
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
WalkthroughIntroduces optional semantics for min_broker_addr across BrokerRuntimeInner APIs, adjusts master-online handling to conditionally retrieve HA info and sync flush offsets, and adds a BrokerOuterAPI method to request HA info from a master. Logging and internal state updates were adapted to Option types. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant R as BrokerRuntimeInner
participant M as Cluster/Membership
participant O as BrokerOuterAPI
participant MB as Master Broker
Note over R: update_min_broker(min_id, Option<addr>)
R->>M: Store min_broker_id, min_broker_addr (Option)
R->>R: on_min_broker_change(min_id, Option<addr>, offline_addr, Option<master_ha_addr>)
alt master_ha_addr is Some(...)
Note over R: Use provided HA addr
else master_ha_addr is None
R->>O: retrieve_broker_ha_info(Option<min_broker_addr>)
O->>MB: ExchangeBrokerHaInfo request
MB-->>O: ExchangeHaInfoResponseHeader
O-->>R: BrokerSyncInfo{master_addr, master_ha_addr, flush_offset}
end
alt Flush-offset sync required
Note over R: Perform master flush-offset synchronization
R->>MB: Sync flush offsets
MB-->>R: Sync result
else No sync needed
Note over R: Proceed without sync
end
R->>M: Update runtime state with Option addrs
Note over R: on_master_on_line(min_broker_addr: Option, master_ha_addr: Option)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements broker HA (High Availability) information retrieval and update logic in the BrokerRuntime module to support issue #4093. The changes enable brokers to exchange HA information and handle master broker state changes more effectively.
- Added a new API method to retrieve broker HA information from master brokers
- Updated broker runtime logic to handle optional broker addresses and improved master online/offline handling
- Implemented synchronization of master flush offset and HA address updates when a master comes online
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
File | Description |
---|---|
rocketmq-broker/src/out_api/broker_outer_api.rs | Added retrieve_broker_ha_info method to fetch HA information from master brokers |
rocketmq-broker/src/broker_runtime.rs | Updated broker address handling to use Option<CheetahString> and implemented master online logic with HA info synchronization |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
let need_sync_master_flush_offset = | ||
self.message_store_unchecked().get_master_flushed_offset() == 0 | ||
&& self.message_store_config.all_ack_in_sync_state_set; | ||
if master_ha_addr.is_none() || need_sync_master_flush_offset { | ||
match self | ||
.broker_outer_api | ||
.retrieve_broker_ha_info(min_broker_addr.as_ref()) | ||
.await | ||
{ | ||
Ok(broker_sync_info) => { | ||
if need_sync_master_flush_offset { |
Copilot
AI
Sep 22, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The variable name need_sync_master_flush_offset
is verbose and could be shortened to should_sync_flush_offset
for better readability while maintaining clarity.
let need_sync_master_flush_offset = | |
self.message_store_unchecked().get_master_flushed_offset() == 0 | |
&& self.message_store_config.all_ack_in_sync_state_set; | |
if master_ha_addr.is_none() || need_sync_master_flush_offset { | |
match self | |
.broker_outer_api | |
.retrieve_broker_ha_info(min_broker_addr.as_ref()) | |
.await | |
{ | |
Ok(broker_sync_info) => { | |
if need_sync_master_flush_offset { | |
let should_sync_flush_offset = | |
self.message_store_unchecked().get_master_flushed_offset() == 0 | |
&& self.message_store_config.all_ack_in_sync_state_set; | |
if master_ha_addr.is_none() || should_sync_flush_offset { | |
match self | |
.broker_outer_api | |
.retrieve_broker_ha_info(min_broker_addr.as_ref()) | |
.await | |
{ | |
Ok(broker_sync_info) => { | |
if should_sync_flush_offset { |
Copilot uses AI. Check for mistakes.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4094 +/- ##
==========================================
- Coverage 26.58% 26.56% -0.02%
==========================================
Files 575 575
Lines 81304 81361 +57
==========================================
Hits 21611 21611
- Misses 59693 59750 +57 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
rocketmq-broker/src/out_api/broker_outer_api.rs (2)
792-820
: Verify error handling for missing master broker address.The method accepts
master_broker_addr
as anOption
, but in the error case at line 818, it's passed tomap_or
which could result in an empty string in the error message whenNone
. Consider making the error message more informative when the address is missing.Err(RocketmqError::MQBrokerError( response.code(), response.remark().map_or("".to_string(), |s| s.to_string()), - master_broker_addr.map_or("".to_string(), |s| s.to_string()), + master_broker_addr.map_or("<no master address>".to_string(), |s| s.to_string()), ))
796-800
: Consider adding timeout configuration parameter.The method uses a hardcoded timeout of 3000ms. For consistency with other methods in this file and better configurability, consider accepting a timeout parameter.
pub async fn retrieve_broker_ha_info( &self, master_broker_addr: Option<&CheetahString>, + timeout_millis: u64, ) -> rocketmq_error::RocketMQResult<BrokerSyncInfo> { let request_header = ExchangeHAInfoRequestHeader::default(); let request = RemotingCommand::create_request_command( RequestCode::ExchangeBrokerHaInfo, request_header, ); let response = self .remoting_client - .invoke_async(master_broker_addr, request, 3000) + .invoke_async(master_broker_addr, request, timeout_millis) .await?;
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
rocketmq-broker/src/broker_runtime.rs
(4 hunks)rocketmq-broker/src/out_api/broker_outer_api.rs
(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
rocketmq-broker/src/out_api/broker_outer_api.rs (3)
rocketmq-remoting/src/protocol/remoting_command.rs (2)
default
(144-159)create_request_command
(172-181)rocketmq-remoting/src/rpc/rpc_client_impl.rs (7)
response
(102-102)response
(137-137)response
(171-171)response
(205-205)response
(238-239)response
(272-273)response
(310-311)rocketmq-remoting/src/code/response_code.rs (4)
from
(29-31)from
(35-45)from
(117-119)from
(123-188)
rocketmq-broker/src/broker_runtime.rs (1)
rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs (1)
update_min_broker
(96-126)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: build
- GitHub Check: test
- GitHub Check: build (windows-latest, stable)
- GitHub Check: auto-approve
- GitHub Check: build (ubuntu-latest, nightly)
- GitHub Check: build (windows-latest, nightly)
- GitHub Check: build (macos-latest, stable)
- GitHub Check: build (ubuntu-latest, stable)
- GitHub Check: build (macos-latest, nightly)
🔇 Additional comments (11)
rocketmq-broker/src/out_api/broker_outer_api.rs (2)
56-59
: LGTM! Clean addition of necessary imports for HA info retrieval.The imports for
BrokerSyncInfo
andExchangeHaInfoResponseHeader
are properly added to support the new HA info retrieval functionality.
807-812
: Good defensive programming with default value for master_flush_offset.The use of
unwrap_or(0)
formaster_flush_offset
ensures the code is resilient to missing offset values.rocketmq-broker/src/broker_runtime.rs (9)
2545-2546
: Clean propagation of optional semantics for min_broker_addr.The change from
unwrap_or_default()
tocloned()
correctly propagates theOption<CheetahString>
type through the system, maintaining consistency with the new optional semantics.
2550-2553
: LGTM! Consistent update of method signature.The
update_min_broker
method signature properly reflects the optional nature ofmin_broker_addr
.
2677-2682
: Well-structured optional parameter handling in on_min_broker_change.The method signature correctly accepts both
min_broker_addr
andmaster_ha_addr
asOption<CheetahString>
, aligning with the new optional semantics throughout the codebase.
2686-2691
: Improved logging with proper Option formatting.Using
{:?}
format specifier for the optional values provides clear visibility into whether the addresses areSome
orNone
.
2695-2695
: Clean state update for optional min_broker_addr.Direct assignment of the
Option<CheetahString>
without unnecessary wrapping maintains type consistency.
2709-2709
: Correct null-safety check for master online detection.The condition properly checks both that it's the master (MASTER_ID) and that the address exists before proceeding with master online logic.
2728-2768
: Excellent implementation of conditional HA info retrieval with proper error handling.The implementation intelligently retrieves broker HA info only when necessary (missing
master_ha_addr
or need to sync flush offset), avoiding unnecessary network calls. The error handling and logging are comprehensive.Key strengths:
- Conditional retrieval logic reduces unnecessary network overhead
- Proper error handling with informative logging
- Clean separation between flush offset sync and HA address update logic
- Maintains backwards compatibility by still accepting the
master_ha_addr
parameter
2732-2734
: Verify timeout configuration for HA info retrieval.The call to
retrieve_broker_ha_info
should include a timeout parameter if you accept my earlier suggestion to add one to that method.If the timeout parameter is added to
retrieve_broker_ha_info
, update this call:match self .broker_outer_api - .retrieve_broker_ha_info(min_broker_addr.as_ref()) + .retrieve_broker_ha_info(min_broker_addr.as_ref(), 3000) .await
2723-2726
: Consistent optional parameter handling in on_master_on_line.Both
min_broker_addr
andmaster_ha_addr
are properly typed asOption<CheetahString>
, maintaining consistency with the caller.
Which Issue(s) This PR Fixes(Closes)
Fixes #4093
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
Bug Fixes
Refactor