Skip to content

Conversation

mxsm
Copy link
Owner

@mxsm mxsm commented Sep 22, 2025

Which Issue(s) This PR Fixes(Closes)

Fixes #4093

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Added an external API to retrieve master HA information, enabling automatic discovery of HA address and flush-offset details.
  • Bug Fixes

    • Improved resilience when the minimum broker address is unavailable, preventing fallback to incorrect defaults.
  • Refactor

    • Optimized master-online handling to sync flush offsets only when necessary, reducing unnecessary operations.
    • Updated logs to clearly reflect when the minimum broker address is absent, improving diagnostics.

@Copilot Copilot AI review requested due to automatic review settings September 22, 2025 07:38
@rocketmq-rust-bot
Copy link
Collaborator

🔊@mxsm 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

Copy link
Contributor

coderabbitai bot commented Sep 22, 2025

Walkthrough

Introduces optional semantics for min_broker_addr across BrokerRuntimeInner APIs, adjusts master-online handling to conditionally retrieve HA info and sync flush offsets, and adds a BrokerOuterAPI method to request HA info from a master. Logging and internal state updates were adapted to Option types.

Changes

Cohort / File(s) Summary of edits
Optional min_broker_addr and master-online flow
rocketmq-broker/src/broker_runtime.rs
Changed min_broker_addr type to Option across update_min_broker, on_min_broker_change, on_master_on_line. Removed defaulting in retrieval, updated checks to is_some(). Reworked master-online path to conditionally retrieve HA info and sync flush offsets based on provided master_ha_addr and need. Adjusted logging and state storage to handle Option.
HA info retrieval API
rocketmq-broker/src/out_api/broker_outer_api.rs
Added BrokerOuterAPI::retrieve_broker_ha_info(master_broker_addr: Option<&CheetahString>) returning BrokerSyncInfo by sending ExchangeBrokerHaInfo and decoding ExchangeHaInfoResponseHeader. Introduced new imports and error propagation with response code/remark and optional master address.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant R as BrokerRuntimeInner
  participant M as Cluster/Membership
  participant O as BrokerOuterAPI
  participant MB as Master Broker

  Note over R: update_min_broker(min_id, Option<addr>)
  R->>M: Store min_broker_id, min_broker_addr (Option)
  R->>R: on_min_broker_change(min_id, Option<addr>, offline_addr, Option<master_ha_addr>)

  alt master_ha_addr is Some(...)
    Note over R: Use provided HA addr
  else master_ha_addr is None
    R->>O: retrieve_broker_ha_info(Option<min_broker_addr>)
    O->>MB: ExchangeBrokerHaInfo request
    MB-->>O: ExchangeHaInfoResponseHeader
    O-->>R: BrokerSyncInfo{master_addr, master_ha_addr, flush_offset}
  end

  alt Flush-offset sync required
    Note over R: Perform master flush-offset synchronization
    R->>MB: Sync flush offsets
    MB-->>R: Sync result
  else No sync needed
    Note over R: Proceed without sync
  end

  R->>M: Update runtime state with Option addrs
  Note over R: on_master_on_line(min_broker_addr: Option, master_ha_addr: Option)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

I twitch my ears at Option’s gentle air,
Addresses may be none—or waiting there.
I hop to masters, fetch their HA tune,
Sync flush offsets beneath a cautious moon.
Carrot in paw, I log what’s shown—
Optional paths, precisely sewn. 🥕✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The PR title accurately describes the principal change—implementing broker HA info retrieval and update logic in BrokerRuntime—as reflected by the added BrokerOuterAPI::retrieve_broker_ha_info method and the BrokerRuntime signature/logic changes in the diff; while it includes an issue reference and an emoji (minor noise), it remains specific and directly related to the changeset.
Linked Issues Check ✅ Passed The changes fulfill the primary objective of issue #4093 by adding BrokerOuterAPI::retrieve_broker_ha_info and updating BrokerRuntime to propagate optional broker addresses and conditionally fetch and synchronize HA info and flush offsets, which implements the retrieval/update behavior requested in the linked issue; the linked issue contained no further constraints, so the code-level changes meet the stated goal.
Out of Scope Changes Check ✅ Passed All modified files and the signature changes (broker_runtime.rs and broker_outer_api.rs) are directly related to propagating and retrieving broker HA information and do not introduce unrelated functionality in the provided summary, so no out-of-scope changes are evident.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature-4093

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@rocketmq-rust-robot rocketmq-rust-robot added the feature🚀 Suggest an idea for this project. label Sep 22, 2025
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements broker HA (High Availability) information retrieval and update logic in the BrokerRuntime module to support issue #4093. The changes enable brokers to exchange HA information and handle master broker state changes more effectively.

  • Added a new API method to retrieve broker HA information from master brokers
  • Updated broker runtime logic to handle optional broker addresses and improved master online/offline handling
  • Implemented synchronization of master flush offset and HA address updates when a master comes online

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

File Description
rocketmq-broker/src/out_api/broker_outer_api.rs Added retrieve_broker_ha_info method to fetch HA information from master brokers
rocketmq-broker/src/broker_runtime.rs Updated broker address handling to use Option<CheetahString> and implemented master online logic with HA info synchronization

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines +2728 to +2738
let need_sync_master_flush_offset =
self.message_store_unchecked().get_master_flushed_offset() == 0
&& self.message_store_config.all_ack_in_sync_state_set;
if master_ha_addr.is_none() || need_sync_master_flush_offset {
match self
.broker_outer_api
.retrieve_broker_ha_info(min_broker_addr.as_ref())
.await
{
Ok(broker_sync_info) => {
if need_sync_master_flush_offset {
Copy link

Copilot AI Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The variable name need_sync_master_flush_offset is verbose and could be shortened to should_sync_flush_offset for better readability while maintaining clarity.

Suggested change
let need_sync_master_flush_offset =
self.message_store_unchecked().get_master_flushed_offset() == 0
&& self.message_store_config.all_ack_in_sync_state_set;
if master_ha_addr.is_none() || need_sync_master_flush_offset {
match self
.broker_outer_api
.retrieve_broker_ha_info(min_broker_addr.as_ref())
.await
{
Ok(broker_sync_info) => {
if need_sync_master_flush_offset {
let should_sync_flush_offset =
self.message_store_unchecked().get_master_flushed_offset() == 0
&& self.message_store_config.all_ack_in_sync_state_set;
if master_ha_addr.is_none() || should_sync_flush_offset {
match self
.broker_outer_api
.retrieve_broker_ha_info(min_broker_addr.as_ref())
.await
{
Ok(broker_sync_info) => {
if should_sync_flush_offset {

Copilot uses AI. Check for mistakes.

Copy link

codecov bot commented Sep 22, 2025

Codecov Report

❌ Patch coverage is 0% with 67 lines in your changes missing coverage. Please review.
✅ Project coverage is 26.56%. Comparing base (44d4700) to head (e4984f5).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
rocketmq-broker/src/broker_runtime.rs 0.00% 43 Missing ⚠️
rocketmq-broker/src/out_api/broker_outer_api.rs 0.00% 24 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #4094      +/-   ##
==========================================
- Coverage   26.58%   26.56%   -0.02%     
==========================================
  Files         575      575              
  Lines       81304    81361      +57     
==========================================
  Hits        21611    21611              
- Misses      59693    59750      +57     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Collaborator

@rocketmq-rust-bot rocketmq-rust-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@rocketmq-rust-bot rocketmq-rust-bot merged commit 0e37278 into main Sep 22, 2025
20 of 23 checks passed
@rocketmq-rust-bot rocketmq-rust-bot added approved PR has approved and removed ready to review waiting-review waiting review this PR labels Sep 22, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
rocketmq-broker/src/out_api/broker_outer_api.rs (2)

792-820: Verify error handling for missing master broker address.

The method accepts master_broker_addr as an Option, but in the error case at line 818, it's passed to map_or which could result in an empty string in the error message when None. Consider making the error message more informative when the address is missing.

         Err(RocketmqError::MQBrokerError(
             response.code(),
             response.remark().map_or("".to_string(), |s| s.to_string()),
-            master_broker_addr.map_or("".to_string(), |s| s.to_string()),
+            master_broker_addr.map_or("<no master address>".to_string(), |s| s.to_string()),
         ))

796-800: Consider adding timeout configuration parameter.

The method uses a hardcoded timeout of 3000ms. For consistency with other methods in this file and better configurability, consider accepting a timeout parameter.

     pub async fn retrieve_broker_ha_info(
         &self,
         master_broker_addr: Option<&CheetahString>,
+        timeout_millis: u64,
     ) -> rocketmq_error::RocketMQResult<BrokerSyncInfo> {
         let request_header = ExchangeHAInfoRequestHeader::default();
         let request = RemotingCommand::create_request_command(
             RequestCode::ExchangeBrokerHaInfo,
             request_header,
         );
         let response = self
             .remoting_client
-            .invoke_async(master_broker_addr, request, 3000)
+            .invoke_async(master_broker_addr, request, timeout_millis)
             .await?;
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 44d4700 and e4984f5.

📒 Files selected for processing (2)
  • rocketmq-broker/src/broker_runtime.rs (4 hunks)
  • rocketmq-broker/src/out_api/broker_outer_api.rs (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
rocketmq-broker/src/out_api/broker_outer_api.rs (3)
rocketmq-remoting/src/protocol/remoting_command.rs (2)
  • default (144-159)
  • create_request_command (172-181)
rocketmq-remoting/src/rpc/rpc_client_impl.rs (7)
  • response (102-102)
  • response (137-137)
  • response (171-171)
  • response (205-205)
  • response (238-239)
  • response (272-273)
  • response (310-311)
rocketmq-remoting/src/code/response_code.rs (4)
  • from (29-31)
  • from (35-45)
  • from (117-119)
  • from (123-188)
rocketmq-broker/src/broker_runtime.rs (1)
rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs (1)
  • update_min_broker (96-126)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: build
  • GitHub Check: test
  • GitHub Check: build (windows-latest, stable)
  • GitHub Check: auto-approve
  • GitHub Check: build (ubuntu-latest, nightly)
  • GitHub Check: build (windows-latest, nightly)
  • GitHub Check: build (macos-latest, stable)
  • GitHub Check: build (ubuntu-latest, stable)
  • GitHub Check: build (macos-latest, nightly)
🔇 Additional comments (11)
rocketmq-broker/src/out_api/broker_outer_api.rs (2)

56-59: LGTM! Clean addition of necessary imports for HA info retrieval.

The imports for BrokerSyncInfo and ExchangeHaInfoResponseHeader are properly added to support the new HA info retrieval functionality.


807-812: Good defensive programming with default value for master_flush_offset.

The use of unwrap_or(0) for master_flush_offset ensures the code is resilient to missing offset values.

rocketmq-broker/src/broker_runtime.rs (9)

2545-2546: Clean propagation of optional semantics for min_broker_addr.

The change from unwrap_or_default() to cloned() correctly propagates the Option<CheetahString> type through the system, maintaining consistency with the new optional semantics.


2550-2553: LGTM! Consistent update of method signature.

The update_min_broker method signature properly reflects the optional nature of min_broker_addr.


2677-2682: Well-structured optional parameter handling in on_min_broker_change.

The method signature correctly accepts both min_broker_addr and master_ha_addr as Option<CheetahString>, aligning with the new optional semantics throughout the codebase.


2686-2691: Improved logging with proper Option formatting.

Using {:?} format specifier for the optional values provides clear visibility into whether the addresses are Some or None.


2695-2695: Clean state update for optional min_broker_addr.

Direct assignment of the Option<CheetahString> without unnecessary wrapping maintains type consistency.


2709-2709: Correct null-safety check for master online detection.

The condition properly checks both that it's the master (MASTER_ID) and that the address exists before proceeding with master online logic.


2728-2768: Excellent implementation of conditional HA info retrieval with proper error handling.

The implementation intelligently retrieves broker HA info only when necessary (missing master_ha_addr or need to sync flush offset), avoiding unnecessary network calls. The error handling and logging are comprehensive.

Key strengths:

  • Conditional retrieval logic reduces unnecessary network overhead
  • Proper error handling with informative logging
  • Clean separation between flush offset sync and HA address update logic
  • Maintains backwards compatibility by still accepting the master_ha_addr parameter

2732-2734: Verify timeout configuration for HA info retrieval.

The call to retrieve_broker_ha_info should include a timeout parameter if you accept my earlier suggestion to add one to that method.

If the timeout parameter is added to retrieve_broker_ha_info, update this call:

             match self
                 .broker_outer_api
-                .retrieve_broker_ha_info(min_broker_addr.as_ref())
+                .retrieve_broker_ha_info(min_broker_addr.as_ref(), 3000)
                 .await

2723-2726: Consistent optional parameter handling in on_master_on_line.

Both min_broker_addr and master_ha_addr are properly typed as Option<CheetahString>, maintaining consistency with the caller.

@mxsm mxsm deleted the feature-4093 branch October 8, 2025 14:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AI review first Ai review pr first approved PR has approved auto merge feature🚀 Suggest an idea for this project.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature🚀] Implement broker HA info retrieval and update logic in BrokerRuntime

3 participants