-
Notifications
You must be signed in to change notification settings - Fork 174
[ISSUE #4052]🚀Implement minimum broker ID and address tracking in broker runtime #4053
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds minimum broker ID/address tracking to the broker runtime. sync_broker_member_group now fetches broker membership, updates alive replica count, determines the minimum broker, and invokes a new async update_min_broker method. BrokerMemberGroup gains minimum_broker_id. on_min_broker_change signature adjusted; implementation remains pending. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Timer as Periodic Task
participant BR as BrokerRuntimeInner
participant NS as NameSrv
participant MS as MessageStore
Timer->>BR: sync_broker_member_group()
BR->>NS: get_broker_member_group()
alt members found
BR->>MS: update_alive_replica_count(count)
BR->>BR: determine minimum_broker_id/address
note over BR: Skip if MASTER_ID or isolated
BR->>BR: update_min_broker(min_id, min_addr)
alt min changed
BR->>BR: on_min_broker_change(min_id, min_addr, offline_addr?)
note over BR: offline_addr via Mutex<Option<...>>
else no change
BR-->>BR: return
end
else empty or error
BR-->>BR: log warning/error
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements minimum broker ID and address tracking in the broker runtime to support broker group management. It adds functionality to track the minimum broker ID in a broker group and properly handle broker state changes.
- Adds
minimum_broker_id()
method toBrokerMemberGroup
to find the smallest broker ID - Implements broker member group synchronization in
BrokerRuntime
with minimum broker tracking - Updates broker runtime structure to store minimum broker ID and address with atomic operations
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
File | Description |
---|---|
rocketmq-remoting/src/protocol/body/broker_body/broker_member_group.rs | Adds minimum_broker_id method and comprehensive tests |
rocketmq-broker/src/broker_runtime.rs | Implements sync_broker_member_group and update_min_broker methods with tracking fields |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
if min_broker_id > min_broker_id_in_group { | ||
offline_broker_addr = self.min_broker_addr_in_group.lock().await.clone(); | ||
} | ||
self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None) |
Copilot
AI
Sep 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing .await
keyword for the async method call. The on_min_broker_change
method appears to be async based on the context, but it's being called without awaiting the result.
self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None) | |
self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None).await |
Copilot uses AI. Check for mistakes.
if min_broker_id > min_broker_id_in_group { | ||
offline_broker_addr = self.min_broker_addr_in_group.lock().await.clone(); | ||
} | ||
self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None) |
Copilot
AI
Sep 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The minimum broker ID and address are updated in memory but never stored back to the atomic fields. After calling on_min_broker_change
, you should update self.min_broker_id_in_group.store(min_broker_id, Ordering::SeqCst)
and *self.min_broker_addr_in_group.lock().await = Some(min_broker_addr.clone())
.
self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None) | |
self.on_min_broker_change(min_broker_id, min_broker_addr.clone(), offline_broker_addr, None); | |
self.min_broker_id_in_group.store(min_broker_id, Ordering::SeqCst); | |
*self.min_broker_addr_in_group.lock().await = Some(min_broker_addr.clone()); |
Copilot uses AI. Check for mistakes.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4053 +/- ##
========================================
Coverage 26.52% 26.52%
========================================
Files 570 570
Lines 80922 81023 +101
========================================
+ Hits 21461 21495 +34
- Misses 59461 59528 +67 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rocketmq-broker/src/broker_runtime.rs (1)
2649-2654
: Critical: unimplemented! will panic in productionon_min_broker_change is invoked by update_min_broker and currently panics. Replace with a non-panicking stub (log-only) until implemented.
Apply this diff:
- fn on_min_broker_change( - &self, - _min_broker_id: u64, - _min_broker_addr: CheetahString, - _offline_broker_addr: Option<CheetahString>, - _master_ha_addr: Option<CheetahString>, - ) { - unimplemented!("BrokerRuntimeInner#on_min_broker_change"); - } + fn on_min_broker_change( + &self, + _min_broker_id: u64, + _min_broker_addr: CheetahString, + _offline_broker_addr: Option<CheetahString>, + _master_ha_addr: Option<CheetahString>, + ) { + info!("on_min_broker_change invoked (stub). TODO: implement behavior."); + }
🧹 Nitpick comments (4)
rocketmq-remoting/src/protocol/body/broker_body/broker_member_group.rs (2)
41-47
: Simplify min-id logic; consider documenting fallback semanticsThe logic is correct. You can simplify and make intent clearer by using unwrap_or. Also, please document that an empty map falls back to MASTER_ID to avoid misuse elsewhere.
Apply this diff:
- pub fn minimum_broker_id(&self) -> u64 { - if self.broker_addrs.is_empty() { - MASTER_ID - } else { - *self.broker_addrs.keys().min().unwrap() - } - } + pub fn minimum_broker_id(&self) -> u64 { + self.broker_addrs.keys().copied().min().unwrap_or(MASTER_ID) + }Optionally add a doc comment: “Returns the smallest broker_id or MASTER_ID when empty.”
139-175
: Tests cover key paths; add empty-map case?Good coverage for normal and edge values. Consider a tiny test asserting empty map returns MASTER_ID to pin the fallback.
rocketmq-broker/src/broker_runtime.rs (2)
244-246
: Initialize min-broker state to local identity/addressStarting these with real values avoids a None offline address on the first transition and reduces spurious “change” detection.
Apply this diff:
- min_broker_id_in_group: AtomicU64::new(0), - min_broker_addr_in_group: Default::default(), + min_broker_id_in_group: AtomicU64::new(broker_config.broker_identity.broker_id), + min_broker_addr_in_group: Mutex::new(Some(broker_config.get_broker_addr().into())),
2476-2534
: Tighten control flow; avoid double unwraps and unreachable defaultCurrent code unwraps twice and computes a default address that cannot occur (min ID always exists when map is non-empty). Minor cleanup improves readability and safety.
Apply this diff:
- let broker_member_group = self - .broker_outer_api - .sync_broker_member_group( - broker_cluster_name, - broker_name, - compatible_with_old_name_srv, - ) - .await; - - if let Err(ref e) = broker_member_group { - error!("syncBrokerMemberGroup from namesrv failed, error={}", e); - return; - } - let broker_member_group = broker_member_group.unwrap(); - if broker_member_group.is_none() - || broker_member_group - .as_ref() - .unwrap() - .broker_addrs - .is_empty() - { - warn!( - "Couldn't find any broker member from namesrv in {}/{}", - broker_cluster_name, broker_name - ); - return; - } + let result = self + .broker_outer_api + .sync_broker_member_group( + broker_cluster_name, + broker_name, + compatible_with_old_name_srv, + ) + .await; + + let Some(broker_member_group) = match result { + Ok(group @ Some(_)) => group, + Ok(None) => { + warn!( + "Couldn't find any broker member from namesrv in {}/{}", + broker_cluster_name, broker_name + ); + return; + } + Err(e) => { + error!("syncBrokerMemberGroup from namesrv failed, error={}", e); + return; + } + }; @@ - let broker_member_group = broker_member_group.unwrap(); self.message_store_unchecked() .set_alive_replica_num_in_group(calc_alive_broker_num_in_group( &broker_member_group.broker_addrs, self.broker_config.broker_identity.broker_id, ) as i32); if !self.is_isolated.load(Ordering::Acquire) { let min_broker_id = broker_member_group.minimum_broker_id(); - let min_broker_addr = broker_member_group - .broker_addrs - .get(&min_broker_id) - .cloned() - .unwrap_or_default(); + let min_broker_addr = broker_member_group + .broker_addrs + .get(&min_broker_id) + .cloned() + .expect("min broker id must exist in broker_addrs"); self.update_min_broker(min_broker_id, min_broker_addr).await; }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
rocketmq-broker/src/broker_runtime.rs
(6 hunks)rocketmq-remoting/src/protocol/body/broker_body/broker_member_group.rs
(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
rocketmq-remoting/src/protocol/body/broker_body/broker_member_group.rs (2)
rocketmq-broker/src/broker_runtime.rs (1)
new
(151-282)rocketmq-common/src/common/broker/broker_config.rs (1)
new
(402-415)
rocketmq-broker/src/broker_runtime.rs (2)
rocketmq-common/src/common/broker/broker_config.rs (8)
new
(402-415)default
(396-398)default
(775-894)default
(1205-1209)broker_cluster_name
(59-61)broker_name
(55-57)broker_name
(898-900)broker_id
(63-65)rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs (1)
update_min_broker
(97-127)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: build (windows-latest, stable)
- GitHub Check: build (ubuntu-latest, stable)
- GitHub Check: build (ubuntu-latest, nightly)
- GitHub Check: build (macos-latest, nightly)
- GitHub Check: build (windows-latest, nightly)
- GitHub Check: build (macos-latest, stable)
- GitHub Check: build
- GitHub Check: test
- GitHub Check: auto-approve
🔇 Additional comments (1)
rocketmq-remoting/src/protocol/body/broker_body/broker_member_group.rs (1)
20-20
: LGTM: MASTER_ID importImport is correct and scoped to the single defaulting use.
pub async fn update_min_broker(&self, min_broker_id: u64, min_broker_addr: CheetahString) { | ||
if self.broker_config.enable_slave_acting_master | ||
&& self.broker_config.broker_identity.broker_id != MASTER_ID | ||
{ | ||
let min_broker_id_in_group = self.min_broker_id_in_group.load(Ordering::SeqCst); | ||
if min_broker_id != min_broker_id_in_group { | ||
let mut offline_broker_addr = None; | ||
if min_broker_id > min_broker_id_in_group { | ||
offline_broker_addr = self.min_broker_addr_in_group.lock().await.clone(); | ||
} | ||
self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Bug: state not updated; repeated events; and potential panic downstream
update_min_broker never stores the new min ID or address. This causes repeated change detections and breaks offline detection. Fix by storing atomically and updating the address under the mutex before invoking the callback. Use Acquire/Release rather than SeqCst here.
Apply this diff:
- pub async fn update_min_broker(&self, min_broker_id: u64, min_broker_addr: CheetahString) {
- if self.broker_config.enable_slave_acting_master
- && self.broker_config.broker_identity.broker_id != MASTER_ID
- {
- let min_broker_id_in_group = self.min_broker_id_in_group.load(Ordering::SeqCst);
- if min_broker_id != min_broker_id_in_group {
- let mut offline_broker_addr = None;
- if min_broker_id > min_broker_id_in_group {
- offline_broker_addr = self.min_broker_addr_in_group.lock().await.clone();
- }
- self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None)
- }
- }
- }
+ pub async fn update_min_broker(&self, min_broker_id: u64, min_broker_addr: CheetahString) {
+ if !(self.broker_config.enable_slave_acting_master
+ && self.broker_config.broker_identity.broker_id != MASTER_ID)
+ {
+ return;
+ }
+ let prev_id = self.min_broker_id_in_group.load(Ordering::Acquire);
+ let mut offline_broker_addr = None;
+ {
+ let mut guard = self.min_broker_addr_in_group.lock().await;
+ if min_broker_id > prev_id {
+ offline_broker_addr = guard.clone();
+ }
+ *guard = Some(min_broker_addr.clone());
+ }
+ if min_broker_id != prev_id {
+ self.min_broker_id_in_group
+ .store(min_broker_id, Ordering::Release);
+ self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None);
+ }
+ }
Additionally, get_min_broker_id_in_group currently returns the local broker_id, not the tracked minimum. Please update it accordingly (outside this hunk):
pub fn get_min_broker_id_in_group(&self) -> u64 {
self.min_broker_id_in_group.load(Ordering::Acquire)
}
🏁 Script executed:
#!/bin/bash
# Verify call sites and ensure semantics rely on the tracked min-id, not local broker_id.
rg -n -C2 'get_min_broker_id_in_group\('
rg -n -C2 'update_min_broker\s*\('
rg -n -C2 'on_min_broker_change\s*\('
Length of output: 8218
Critical: persist min-broker state atomically before invoking on_min_broker_change
- update_min_broker never updates the tracked min ID/address, causing repeated change events and broken offline detection; write the new min ID (Acquire/Release) and update min_broker_addr_in_group under the mutex before calling on_min_broker_change.
- get_min_broker_id_in_group currently returns the local broker_id — change it to return the tracked atomic min ID.
Locations: rocketmq-broker/src/broker_runtime.rs (update_min_broker ≈ lines 2536–2549; get_min_broker_id_in_group ≈ lines 2251–2253). Callers: processor/admin_broker_processor/notify_min_broker_id_handler.rs and processor/default_pull_message_result_handler.rs.
Apply this diff:
- pub async fn update_min_broker(&self, min_broker_id: u64, min_broker_addr: CheetahString) {
- if self.broker_config.enable_slave_acting_master
- && self.broker_config.broker_identity.broker_id != MASTER_ID
- {
- let min_broker_id_in_group = self.min_broker_id_in_group.load(Ordering::SeqCst);
- if min_broker_id != min_broker_id_in_group {
- let mut offline_broker_addr = None;
- if min_broker_id > min_broker_id_in_group {
- offline_broker_addr = self.min_broker_addr_in_group.lock().await.clone();
- }
- self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None)
- }
- }
- }
+ pub async fn update_min_broker(&self, min_broker_id: u64, min_broker_addr: CheetahString) {
+ if !(self.broker_config.enable_slave_acting_master
+ && self.broker_config.broker_identity.broker_id != MASTER_ID)
+ {
+ return;
+ }
+ let prev_id = self.min_broker_id_in_group.load(Ordering::Acquire);
+ let mut offline_broker_addr = None;
+ {
+ let mut guard = self.min_broker_addr_in_group.lock().await;
+ if min_broker_id > prev_id {
+ offline_broker_addr = guard.clone();
+ }
+ *guard = Some(min_broker_addr.clone());
+ }
+ if min_broker_id != prev_id {
+ self.min_broker_id_in_group
+ .store(min_broker_id, Ordering::Release);
+ self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None);
+ }
+ }
Additionally, update get_min_broker_id_in_group to:
pub fn get_min_broker_id_in_group(&self) -> u64 {
self.min_broker_id_in_group.load(Ordering::Acquire)
}
🤖 Prompt for AI Agents
In rocketmq-broker/src/broker_runtime.rs around lines 2536–2549 and
get_min_broker_id_in_group at ~2251–2253, update_min_broker must persist the new
min state atomically before calling on_min_broker_change: acquire the mutex on
min_broker_addr_in_group, update/replace the stored address there, then store
the new min_broker_id into min_broker_id_in_group using an atomic store with
Release ordering (so readers using Acquire see the update), and only after both
the mutex-held address update and atomic store complete call
on_min_broker_change; also change get_min_broker_id_in_group to return
self.min_broker_id_in_group.load(Ordering::Acquire) instead of the local
broker_id so callers observe the tracked min ID.
Which Issue(s) This PR Fixes(Closes)
Fixes #4052
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
Tests