Skip to content

Conversation

mxsm
Copy link
Owner

@mxsm mxsm commented Sep 16, 2025

Which Issue(s) This PR Fixes(Closes)

Fixes #4052

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Improved broker cluster synchronization with automatic detection of the minimum broker in a group.
    • Enhanced handling of replica liveness and membership updates from the name server.
    • Preparatory support for slave-acting-master scenarios with change notifications on minimum-broker updates.
  • Tests

    • Added unit tests to verify minimum broker ID calculation across edge cases.

@Copilot Copilot AI review requested due to automatic review settings September 16, 2025 07:46
Copy link
Contributor

coderabbitai bot commented Sep 16, 2025

Walkthrough

Adds minimum broker ID/address tracking to the broker runtime. sync_broker_member_group now fetches broker membership, updates alive replica count, determines the minimum broker, and invokes a new async update_min_broker method. BrokerMemberGroup gains minimum_broker_id. on_min_broker_change signature adjusted; implementation remains pending.

Changes

Cohort / File(s) Summary
Broker runtime: min-broker tracking and sync flow
rocketmq-broker/src/broker_runtime.rs
Introduces MASTER_ID usage and tokio::sync::Mutex; adds AtomicU64 and Mutex<Option<CheetahString>> fields for min-broker tracking; enhances sync_broker_member_group to fetch members, validate addresses, update alive replicas, compute minimum broker, and call update_min_broker; adds pub async fn update_min_broker(...); changes on_min_broker_change signature to &self, u64.
Remoting: BrokerMemberGroup utility
rocketmq-remoting/src/protocol/body/broker_body/broker_member_group.rs
Adds minimum_broker_id() returning the smallest broker_id or MASTER_ID if empty; imports MASTER_ID; adds unit tests covering typical, zero, and u64::MAX cases.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Timer as Periodic Task
  participant BR as BrokerRuntimeInner
  participant NS as NameSrv
  participant MS as MessageStore

  Timer->>BR: sync_broker_member_group()
  BR->>NS: get_broker_member_group()
  alt members found
    BR->>MS: update_alive_replica_count(count)
    BR->>BR: determine minimum_broker_id/address
    note over BR: Skip if MASTER_ID or isolated
    BR->>BR: update_min_broker(min_id, min_addr)
    alt min changed
      BR->>BR: on_min_broker_change(min_id, min_addr, offline_addr?)
      note over BR: offline_addr via Mutex<Option<...>>
    else no change
      BR-->>BR: return
    end
  else empty or error
    BR-->>BR: log warning/error
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

I twitch my ears at minimum IDs,
Hopping through clusters with breezy ease.
A mutex burrow, atomic thump—
We track the smallest in every clump.
When masters nap and slaves awake,
I nose-tap changes for uptime’s sake. 🐇✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 30.77% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Linked Issues Check ❓ Inconclusive Linked issue [4052] requests minimum-broker ID/address tracking; the changes add min_broker_id_in_group and min_broker_addr_in_group, a public update_min_broker API, modifications to sync_broker_member_group to compute and push updates, and BrokerMemberGroup::minimum_broker_id with unit tests, which implement the core tracking feature, but the linked issue contains no acceptance criteria and the on_min_broker_change callback remains unimplemented so I cannot confirm the runtime behavior is fully complete. Ask the author to update issue #4052 with clear acceptance criteria, document the intended behavior for on_min_broker_change, and add unit or integration tests that exercise update_min_broker and the on_min_broker_change flow so reviewers can verify full compliance.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The PR title "[ISSUE #4052]🚀Implement minimum broker ID and address tracking in broker runtime" accurately identifies the primary change—adding minimum-broker ID/address tracking in the broker runtime—and references the linked issue, so it clearly summarizes the main intent of the changeset.
Out of Scope Changes Check ✅ Passed All modified files and changes in the provided summary (broker_runtime.rs and broker_member_group.rs) are directly related to implementing minimum-broker ID/address tracking and associated APIs/tests; I did not find unrelated or out-of-scope changes in the summary.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature-4052

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@rocketmq-rust-bot
Copy link
Collaborator

🔊@mxsm 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements minimum broker ID and address tracking in the broker runtime to support broker group management. It adds functionality to track the minimum broker ID in a broker group and properly handle broker state changes.

  • Adds minimum_broker_id() method to BrokerMemberGroup to find the smallest broker ID
  • Implements broker member group synchronization in BrokerRuntime with minimum broker tracking
  • Updates broker runtime structure to store minimum broker ID and address with atomic operations

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
rocketmq-remoting/src/protocol/body/broker_body/broker_member_group.rs Adds minimum_broker_id method and comprehensive tests
rocketmq-broker/src/broker_runtime.rs Implements sync_broker_member_group and update_min_broker methods with tracking fields

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

if min_broker_id > min_broker_id_in_group {
offline_broker_addr = self.min_broker_addr_in_group.lock().await.clone();
}
self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None)
Copy link

Copilot AI Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing .await keyword for the async method call. The on_min_broker_change method appears to be async based on the context, but it's being called without awaiting the result.

Suggested change
self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None)
self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None).await

Copilot uses AI. Check for mistakes.

if min_broker_id > min_broker_id_in_group {
offline_broker_addr = self.min_broker_addr_in_group.lock().await.clone();
}
self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None)
Copy link

Copilot AI Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The minimum broker ID and address are updated in memory but never stored back to the atomic fields. After calling on_min_broker_change, you should update self.min_broker_id_in_group.store(min_broker_id, Ordering::SeqCst) and *self.min_broker_addr_in_group.lock().await = Some(min_broker_addr.clone()).

Suggested change
self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None)
self.on_min_broker_change(min_broker_id, min_broker_addr.clone(), offline_broker_addr, None);
self.min_broker_id_in_group.store(min_broker_id, Ordering::SeqCst);
*self.min_broker_addr_in_group.lock().await = Some(min_broker_addr.clone());

Copilot uses AI. Check for mistakes.

Copy link

codecov bot commented Sep 16, 2025

Codecov Report

❌ Patch coverage is 32.69231% with 70 lines in your changes missing coverage. Please review.
✅ Project coverage is 26.52%. Comparing base (22fa414) to head (fafd00d).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
rocketmq-broker/src/broker_runtime.rs 0.00% 69 Missing ⚠️
...c/protocol/body/broker_body/broker_member_group.rs 97.14% 1 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff            @@
##             main    #4053    +/-   ##
========================================
  Coverage   26.52%   26.52%            
========================================
  Files         570      570            
  Lines       80922    81023   +101     
========================================
+ Hits        21461    21495    +34     
- Misses      59461    59528    +67     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Collaborator

@rocketmq-rust-bot rocketmq-rust-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@rocketmq-rust-bot rocketmq-rust-bot merged commit 8c4201c into main Sep 16, 2025
23 checks passed
@rocketmq-rust-bot rocketmq-rust-bot added approved PR has approved and removed ready to review waiting-review waiting review this PR labels Sep 16, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
rocketmq-broker/src/broker_runtime.rs (1)

2649-2654: Critical: unimplemented! will panic in production

on_min_broker_change is invoked by update_min_broker and currently panics. Replace with a non-panicking stub (log-only) until implemented.

Apply this diff:

-    fn on_min_broker_change(
-        &self,
-        _min_broker_id: u64,
-        _min_broker_addr: CheetahString,
-        _offline_broker_addr: Option<CheetahString>,
-        _master_ha_addr: Option<CheetahString>,
-    ) {
-        unimplemented!("BrokerRuntimeInner#on_min_broker_change");
-    }
+    fn on_min_broker_change(
+        &self,
+        _min_broker_id: u64,
+        _min_broker_addr: CheetahString,
+        _offline_broker_addr: Option<CheetahString>,
+        _master_ha_addr: Option<CheetahString>,
+    ) {
+        info!("on_min_broker_change invoked (stub). TODO: implement behavior.");
+    }
🧹 Nitpick comments (4)
rocketmq-remoting/src/protocol/body/broker_body/broker_member_group.rs (2)

41-47: Simplify min-id logic; consider documenting fallback semantics

The logic is correct. You can simplify and make intent clearer by using unwrap_or. Also, please document that an empty map falls back to MASTER_ID to avoid misuse elsewhere.

Apply this diff:

-    pub fn minimum_broker_id(&self) -> u64 {
-        if self.broker_addrs.is_empty() {
-            MASTER_ID
-        } else {
-            *self.broker_addrs.keys().min().unwrap()
-        }
-    }
+    pub fn minimum_broker_id(&self) -> u64 {
+        self.broker_addrs.keys().copied().min().unwrap_or(MASTER_ID)
+    }

Optionally add a doc comment: “Returns the smallest broker_id or MASTER_ID when empty.”


139-175: Tests cover key paths; add empty-map case?

Good coverage for normal and edge values. Consider a tiny test asserting empty map returns MASTER_ID to pin the fallback.

rocketmq-broker/src/broker_runtime.rs (2)

244-246: Initialize min-broker state to local identity/address

Starting these with real values avoids a None offline address on the first transition and reduces spurious “change” detection.

Apply this diff:

-            min_broker_id_in_group: AtomicU64::new(0),
-            min_broker_addr_in_group: Default::default(),
+            min_broker_id_in_group: AtomicU64::new(broker_config.broker_identity.broker_id),
+            min_broker_addr_in_group: Mutex::new(Some(broker_config.get_broker_addr().into())),

2476-2534: Tighten control flow; avoid double unwraps and unreachable default

Current code unwraps twice and computes a default address that cannot occur (min ID always exists when map is non-empty). Minor cleanup improves readability and safety.

Apply this diff:

-        let broker_member_group = self
-            .broker_outer_api
-            .sync_broker_member_group(
-                broker_cluster_name,
-                broker_name,
-                compatible_with_old_name_srv,
-            )
-            .await;
-
-        if let Err(ref e) = broker_member_group {
-            error!("syncBrokerMemberGroup from namesrv failed, error={}", e);
-            return;
-        }
-        let broker_member_group = broker_member_group.unwrap();
-        if broker_member_group.is_none()
-            || broker_member_group
-                .as_ref()
-                .unwrap()
-                .broker_addrs
-                .is_empty()
-        {
-            warn!(
-                "Couldn't find any broker member from namesrv in {}/{}",
-                broker_cluster_name, broker_name
-            );
-            return;
-        }
+        let result = self
+            .broker_outer_api
+            .sync_broker_member_group(
+                broker_cluster_name,
+                broker_name,
+                compatible_with_old_name_srv,
+            )
+            .await;
+
+        let Some(broker_member_group) = match result {
+            Ok(group @ Some(_)) => group,
+            Ok(None) => {
+                warn!(
+                    "Couldn't find any broker member from namesrv in {}/{}",
+                    broker_cluster_name, broker_name
+                );
+                return;
+            }
+            Err(e) => {
+                error!("syncBrokerMemberGroup from namesrv failed, error={}", e);
+                return;
+            }
+        };
@@
-        let broker_member_group = broker_member_group.unwrap();
         self.message_store_unchecked()
             .set_alive_replica_num_in_group(calc_alive_broker_num_in_group(
                 &broker_member_group.broker_addrs,
                 self.broker_config.broker_identity.broker_id,
             ) as i32);
         if !self.is_isolated.load(Ordering::Acquire) {
             let min_broker_id = broker_member_group.minimum_broker_id();
-            let min_broker_addr = broker_member_group
-                .broker_addrs
-                .get(&min_broker_id)
-                .cloned()
-                .unwrap_or_default();
+            let min_broker_addr = broker_member_group
+                .broker_addrs
+                .get(&min_broker_id)
+                .cloned()
+                .expect("min broker id must exist in broker_addrs");
             self.update_min_broker(min_broker_id, min_broker_addr).await;
         }
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 22fa414 and fafd00d.

📒 Files selected for processing (2)
  • rocketmq-broker/src/broker_runtime.rs (6 hunks)
  • rocketmq-remoting/src/protocol/body/broker_body/broker_member_group.rs (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
rocketmq-remoting/src/protocol/body/broker_body/broker_member_group.rs (2)
rocketmq-broker/src/broker_runtime.rs (1)
  • new (151-282)
rocketmq-common/src/common/broker/broker_config.rs (1)
  • new (402-415)
rocketmq-broker/src/broker_runtime.rs (2)
rocketmq-common/src/common/broker/broker_config.rs (8)
  • new (402-415)
  • default (396-398)
  • default (775-894)
  • default (1205-1209)
  • broker_cluster_name (59-61)
  • broker_name (55-57)
  • broker_name (898-900)
  • broker_id (63-65)
rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs (1)
  • update_min_broker (97-127)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: build (windows-latest, stable)
  • GitHub Check: build (ubuntu-latest, stable)
  • GitHub Check: build (ubuntu-latest, nightly)
  • GitHub Check: build (macos-latest, nightly)
  • GitHub Check: build (windows-latest, nightly)
  • GitHub Check: build (macos-latest, stable)
  • GitHub Check: build
  • GitHub Check: test
  • GitHub Check: auto-approve
🔇 Additional comments (1)
rocketmq-remoting/src/protocol/body/broker_body/broker_member_group.rs (1)

20-20: LGTM: MASTER_ID import

Import is correct and scoped to the single defaulting use.

Comment on lines +2536 to 2549
pub async fn update_min_broker(&self, min_broker_id: u64, min_broker_addr: CheetahString) {
if self.broker_config.enable_slave_acting_master
&& self.broker_config.broker_identity.broker_id != MASTER_ID
{
let min_broker_id_in_group = self.min_broker_id_in_group.load(Ordering::SeqCst);
if min_broker_id != min_broker_id_in_group {
let mut offline_broker_addr = None;
if min_broker_id > min_broker_id_in_group {
offline_broker_addr = self.min_broker_addr_in_group.lock().await.clone();
}
self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None)
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

🧩 Analysis chain

Bug: state not updated; repeated events; and potential panic downstream

update_min_broker never stores the new min ID or address. This causes repeated change detections and breaks offline detection. Fix by storing atomically and updating the address under the mutex before invoking the callback. Use Acquire/Release rather than SeqCst here.

Apply this diff:

-    pub async fn update_min_broker(&self, min_broker_id: u64, min_broker_addr: CheetahString) {
-        if self.broker_config.enable_slave_acting_master
-            && self.broker_config.broker_identity.broker_id != MASTER_ID
-        {
-            let min_broker_id_in_group = self.min_broker_id_in_group.load(Ordering::SeqCst);
-            if min_broker_id != min_broker_id_in_group {
-                let mut offline_broker_addr = None;
-                if min_broker_id > min_broker_id_in_group {
-                    offline_broker_addr = self.min_broker_addr_in_group.lock().await.clone();
-                }
-                self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None)
-            }
-        }
-    }
+    pub async fn update_min_broker(&self, min_broker_id: u64, min_broker_addr: CheetahString) {
+        if !(self.broker_config.enable_slave_acting_master
+            && self.broker_config.broker_identity.broker_id != MASTER_ID)
+        {
+            return;
+        }
+        let prev_id = self.min_broker_id_in_group.load(Ordering::Acquire);
+        let mut offline_broker_addr = None;
+        {
+            let mut guard = self.min_broker_addr_in_group.lock().await;
+            if min_broker_id > prev_id {
+                offline_broker_addr = guard.clone();
+            }
+            *guard = Some(min_broker_addr.clone());
+        }
+        if min_broker_id != prev_id {
+            self.min_broker_id_in_group
+                .store(min_broker_id, Ordering::Release);
+            self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None);
+        }
+    }

Additionally, get_min_broker_id_in_group currently returns the local broker_id, not the tracked minimum. Please update it accordingly (outside this hunk):

pub fn get_min_broker_id_in_group(&self) -> u64 {
    self.min_broker_id_in_group.load(Ordering::Acquire)
}

🏁 Script executed:

#!/bin/bash
# Verify call sites and ensure semantics rely on the tracked min-id, not local broker_id.
rg -n -C2 'get_min_broker_id_in_group\('
rg -n -C2 'update_min_broker\s*\('
rg -n -C2 'on_min_broker_change\s*\('

Length of output: 8218


Critical: persist min-broker state atomically before invoking on_min_broker_change

  • update_min_broker never updates the tracked min ID/address, causing repeated change events and broken offline detection; write the new min ID (Acquire/Release) and update min_broker_addr_in_group under the mutex before calling on_min_broker_change.
  • get_min_broker_id_in_group currently returns the local broker_id — change it to return the tracked atomic min ID.

Locations: rocketmq-broker/src/broker_runtime.rs (update_min_broker ≈ lines 2536–2549; get_min_broker_id_in_group ≈ lines 2251–2253). Callers: processor/admin_broker_processor/notify_min_broker_id_handler.rs and processor/default_pull_message_result_handler.rs.

Apply this diff:

-    pub async fn update_min_broker(&self, min_broker_id: u64, min_broker_addr: CheetahString) {
-        if self.broker_config.enable_slave_acting_master
-            && self.broker_config.broker_identity.broker_id != MASTER_ID
-        {
-            let min_broker_id_in_group = self.min_broker_id_in_group.load(Ordering::SeqCst);
-            if min_broker_id != min_broker_id_in_group {
-                let mut offline_broker_addr = None;
-                if min_broker_id > min_broker_id_in_group {
-                    offline_broker_addr = self.min_broker_addr_in_group.lock().await.clone();
-                }
-                self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None)
-            }
-        }
-    }
+    pub async fn update_min_broker(&self, min_broker_id: u64, min_broker_addr: CheetahString) {
+        if !(self.broker_config.enable_slave_acting_master
+            && self.broker_config.broker_identity.broker_id != MASTER_ID)
+        {
+            return;
+        }
+        let prev_id = self.min_broker_id_in_group.load(Ordering::Acquire);
+        let mut offline_broker_addr = None;
+        {
+            let mut guard = self.min_broker_addr_in_group.lock().await;
+            if min_broker_id > prev_id {
+                offline_broker_addr = guard.clone();
+            }
+            *guard = Some(min_broker_addr.clone());
+        }
+        if min_broker_id != prev_id {
+            self.min_broker_id_in_group
+                .store(min_broker_id, Ordering::Release);
+            self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None);
+        }
+    }

Additionally, update get_min_broker_id_in_group to:

pub fn get_min_broker_id_in_group(&self) -> u64 {
    self.min_broker_id_in_group.load(Ordering::Acquire)
}
🤖 Prompt for AI Agents
In rocketmq-broker/src/broker_runtime.rs around lines 2536–2549 and
get_min_broker_id_in_group at ~2251–2253, update_min_broker must persist the new
min state atomically before calling on_min_broker_change: acquire the mutex on
min_broker_addr_in_group, update/replace the stored address there, then store
the new min_broker_id into min_broker_id_in_group using an atomic store with
Release ordering (so readers using Acquire see the update), and only after both
the mutex-held address update and atomic store complete call
on_min_broker_change; also change get_min_broker_id_in_group to return
self.min_broker_id_in_group.load(Ordering::Acquire) instead of the local
broker_id so callers observe the tracked min ID.

@mxsm mxsm deleted the feature-4052 branch September 19, 2025 16:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AI review first Ai review pr first approved PR has approved auto merge feature🚀 Suggest an idea for this project.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature🚀] Implement minimum broker ID and address tracking in broker runtime

3 participants