Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 81 additions & 3 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use rocketmq_common::common::config::TopicConfig;
use rocketmq_common::common::config_manager::ConfigManager;
use rocketmq_common::common::constant::PermName;
use rocketmq_common::common::mix_all;
use rocketmq_common::common::mix_all::MASTER_ID;
use rocketmq_common::common::server::config::ServerConfig;
use rocketmq_common::common::statistics::state_getter::StateGetter;
use rocketmq_common::TimeUtils::get_current_millis;
Expand All @@ -55,6 +56,7 @@ use rocketmq_store::message_store::local_file_message_store::LocalFileMessageSto
use rocketmq_store::stats::broker_stats::BrokerStats;
use rocketmq_store::stats::broker_stats_manager::BrokerStatsManager;
use rocketmq_store::timer::timer_message_store::TimerMessageStore;
use tokio::sync::Mutex;
use tracing::error;
use tracing::info;
use tracing::warn;
Expand Down Expand Up @@ -239,6 +241,8 @@ impl BrokerRuntime {
slave_synchronize: None,
last_sync_time_ms: AtomicU64::new(get_current_millis()),
broker_pre_online_service: None,
min_broker_id_in_group: AtomicU64::new(0),
min_broker_addr_in_group: Default::default(),
});
let mut stats_manager = BrokerStatsManager::new(inner.broker_config.clone());
stats_manager.set_producer_state_getter(Arc::new(ProducerStateGetter {
Expand Down Expand Up @@ -1610,6 +1614,8 @@ pub(crate) struct BrokerRuntimeInner<MS: MessageStore> {
slave_synchronize: Option<SlaveSynchronize<MS>>,
last_sync_time_ms: AtomicU64,
broker_pre_online_service: Option<BrokerPreOnlineService<MS>>,
min_broker_id_in_group: AtomicU64,
min_broker_addr_in_group: Mutex<Option<CheetahString>>,
}

impl<MS: MessageStore> BrokerRuntimeInner<MS> {
Expand Down Expand Up @@ -2467,7 +2473,79 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
}

async fn sync_broker_member_group(&self) {
warn!("sync_broker_member_group not implemented");
let broker_cluster_name = &self.broker_config.broker_identity.broker_cluster_name;
let broker_name = &self.broker_config.broker_identity.broker_name;
let compatible_with_old_name_srv = self.broker_config.compatible_with_old_name_srv;
let broker_member_group = self
.broker_outer_api
.sync_broker_member_group(
broker_cluster_name,
broker_name,
compatible_with_old_name_srv,
)
.await;

if let Err(ref e) = broker_member_group {
error!("syncBrokerMemberGroup from namesrv failed, error={}", e);
return;
}
let broker_member_group = broker_member_group.unwrap();
if broker_member_group.is_none()
|| broker_member_group
.as_ref()
.unwrap()
.broker_addrs
.is_empty()
{
warn!(
"Couldn't find any broker member from namesrv in {}/{}",
broker_cluster_name, broker_name
);
return;
}
fn calc_alive_broker_num_in_group(
broker_addr_table: &HashMap<
u64, /* brokerId */
CheetahString, /* broker address */
>,
broker_id: u64,
) -> usize {
if broker_addr_table.contains_key(&broker_id) {
broker_addr_table.len()
} else {
broker_addr_table.len() + 1
}
}
let broker_member_group = broker_member_group.unwrap();
self.message_store_unchecked()
.set_alive_replica_num_in_group(calc_alive_broker_num_in_group(
&broker_member_group.broker_addrs,
self.broker_config.broker_identity.broker_id,
) as i32);
if !self.is_isolated.load(Ordering::Acquire) {
let min_broker_id = broker_member_group.minimum_broker_id();
let min_broker_addr = broker_member_group
.broker_addrs
.get(&min_broker_id)
.cloned()
.unwrap_or_default();
self.update_min_broker(min_broker_id, min_broker_addr).await;
}
}

pub async fn update_min_broker(&self, min_broker_id: u64, min_broker_addr: CheetahString) {
if self.broker_config.enable_slave_acting_master
&& self.broker_config.broker_identity.broker_id != MASTER_ID
{
let min_broker_id_in_group = self.min_broker_id_in_group.load(Ordering::SeqCst);
if min_broker_id != min_broker_id_in_group {
let mut offline_broker_addr = None;
if min_broker_id > min_broker_id_in_group {
offline_broker_addr = self.min_broker_addr_in_group.lock().await.clone();
}
self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None)
Copy link

Copilot AI Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing .await keyword for the async method call. The on_min_broker_change method appears to be async based on the context, but it's being called without awaiting the result.

Suggested change
self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None)
self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None).await

Copilot uses AI. Check for mistakes.

Copy link

Copilot AI Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The minimum broker ID and address are updated in memory but never stored back to the atomic fields. After calling on_min_broker_change, you should update self.min_broker_id_in_group.store(min_broker_id, Ordering::SeqCst) and *self.min_broker_addr_in_group.lock().await = Some(min_broker_addr.clone()).

Suggested change
self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None)
self.on_min_broker_change(min_broker_id, min_broker_addr.clone(), offline_broker_addr, None);
self.min_broker_id_in_group.store(min_broker_id, Ordering::SeqCst);
*self.min_broker_addr_in_group.lock().await = Some(min_broker_addr.clone());

Copilot uses AI. Check for mistakes.

}
}
}
Comment on lines +2536 to 2549
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

🧩 Analysis chain

Bug: state not updated; repeated events; and potential panic downstream

update_min_broker never stores the new min ID or address. This causes repeated change detections and breaks offline detection. Fix by storing atomically and updating the address under the mutex before invoking the callback. Use Acquire/Release rather than SeqCst here.

Apply this diff:

-    pub async fn update_min_broker(&self, min_broker_id: u64, min_broker_addr: CheetahString) {
-        if self.broker_config.enable_slave_acting_master
-            && self.broker_config.broker_identity.broker_id != MASTER_ID
-        {
-            let min_broker_id_in_group = self.min_broker_id_in_group.load(Ordering::SeqCst);
-            if min_broker_id != min_broker_id_in_group {
-                let mut offline_broker_addr = None;
-                if min_broker_id > min_broker_id_in_group {
-                    offline_broker_addr = self.min_broker_addr_in_group.lock().await.clone();
-                }
-                self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None)
-            }
-        }
-    }
+    pub async fn update_min_broker(&self, min_broker_id: u64, min_broker_addr: CheetahString) {
+        if !(self.broker_config.enable_slave_acting_master
+            && self.broker_config.broker_identity.broker_id != MASTER_ID)
+        {
+            return;
+        }
+        let prev_id = self.min_broker_id_in_group.load(Ordering::Acquire);
+        let mut offline_broker_addr = None;
+        {
+            let mut guard = self.min_broker_addr_in_group.lock().await;
+            if min_broker_id > prev_id {
+                offline_broker_addr = guard.clone();
+            }
+            *guard = Some(min_broker_addr.clone());
+        }
+        if min_broker_id != prev_id {
+            self.min_broker_id_in_group
+                .store(min_broker_id, Ordering::Release);
+            self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None);
+        }
+    }

Additionally, get_min_broker_id_in_group currently returns the local broker_id, not the tracked minimum. Please update it accordingly (outside this hunk):

pub fn get_min_broker_id_in_group(&self) -> u64 {
    self.min_broker_id_in_group.load(Ordering::Acquire)
}

🏁 Script executed:

#!/bin/bash
# Verify call sites and ensure semantics rely on the tracked min-id, not local broker_id.
rg -n -C2 'get_min_broker_id_in_group\('
rg -n -C2 'update_min_broker\s*\('
rg -n -C2 'on_min_broker_change\s*\('

Length of output: 8218


Critical: persist min-broker state atomically before invoking on_min_broker_change

  • update_min_broker never updates the tracked min ID/address, causing repeated change events and broken offline detection; write the new min ID (Acquire/Release) and update min_broker_addr_in_group under the mutex before calling on_min_broker_change.
  • get_min_broker_id_in_group currently returns the local broker_id — change it to return the tracked atomic min ID.

Locations: rocketmq-broker/src/broker_runtime.rs (update_min_broker ≈ lines 2536–2549; get_min_broker_id_in_group ≈ lines 2251–2253). Callers: processor/admin_broker_processor/notify_min_broker_id_handler.rs and processor/default_pull_message_result_handler.rs.

Apply this diff:

-    pub async fn update_min_broker(&self, min_broker_id: u64, min_broker_addr: CheetahString) {
-        if self.broker_config.enable_slave_acting_master
-            && self.broker_config.broker_identity.broker_id != MASTER_ID
-        {
-            let min_broker_id_in_group = self.min_broker_id_in_group.load(Ordering::SeqCst);
-            if min_broker_id != min_broker_id_in_group {
-                let mut offline_broker_addr = None;
-                if min_broker_id > min_broker_id_in_group {
-                    offline_broker_addr = self.min_broker_addr_in_group.lock().await.clone();
-                }
-                self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None)
-            }
-        }
-    }
+    pub async fn update_min_broker(&self, min_broker_id: u64, min_broker_addr: CheetahString) {
+        if !(self.broker_config.enable_slave_acting_master
+            && self.broker_config.broker_identity.broker_id != MASTER_ID)
+        {
+            return;
+        }
+        let prev_id = self.min_broker_id_in_group.load(Ordering::Acquire);
+        let mut offline_broker_addr = None;
+        {
+            let mut guard = self.min_broker_addr_in_group.lock().await;
+            if min_broker_id > prev_id {
+                offline_broker_addr = guard.clone();
+            }
+            *guard = Some(min_broker_addr.clone());
+        }
+        if min_broker_id != prev_id {
+            self.min_broker_id_in_group
+                .store(min_broker_id, Ordering::Release);
+            self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None);
+        }
+    }

Additionally, update get_min_broker_id_in_group to:

pub fn get_min_broker_id_in_group(&self) -> u64 {
    self.min_broker_id_in_group.load(Ordering::Acquire)
}
🤖 Prompt for AI Agents
In rocketmq-broker/src/broker_runtime.rs around lines 2536–2549 and
get_min_broker_id_in_group at ~2251–2253, update_min_broker must persist the new
min state atomically before calling on_min_broker_change: acquire the mutex on
min_broker_addr_in_group, update/replace the stored address there, then store
the new min_broker_id into min_broker_id_in_group using an atomic store with
Release ordering (so readers using Acquire see the update), and only after both
the mutex-held address update and atomic store complete call
on_min_broker_change; also change get_min_broker_id_in_group to return
self.min_broker_id_in_group.load(Ordering::Acquire) instead of the local
broker_id so callers observe the tracked min ID.


pub fn pop_message_processor_unchecked(&self) -> &ArcMut<PopMessageProcessor<MS>> {
Expand Down Expand Up @@ -2568,8 +2646,8 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
}

fn on_min_broker_change(
&mut self,
_min_broker_id: i64,
&self,
_min_broker_id: u64,
_min_broker_addr: CheetahString,
_offline_broker_addr: Option<CheetahString>,
_master_ha_addr: Option<CheetahString>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use std::collections::HashMap;

use cheetah_string::CheetahString;
use rocketmq_common::common::mix_all::MASTER_ID;
use serde::Deserialize;
use serde::Serialize;

Expand All @@ -36,6 +37,14 @@ impl BrokerMemberGroup {
broker_addrs: HashMap::new(),
}
}

pub fn minimum_broker_id(&self) -> u64 {
if self.broker_addrs.is_empty() {
MASTER_ID
} else {
*self.broker_addrs.keys().min().unwrap()
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
Expand Down Expand Up @@ -126,4 +135,42 @@ mod tests {
assert_eq!(group.broker_name, CheetahString::from("test_broker"));
assert!(group.broker_addrs.is_empty());
}

#[test]
fn minimum_broker_id_returns_smallest_broker_id_when_present() {
let mut group = BrokerMemberGroup::new(
CheetahString::from("cluster"),
CheetahString::from("broker"),
);
group
.broker_addrs
.insert(3, CheetahString::from("127.0.0.1:10913"));
group
.broker_addrs
.insert(1, CheetahString::from("127.0.0.1:10911"));
group
.broker_addrs
.insert(2, CheetahString::from("127.0.0.1:10912"));

assert_eq!(group.minimum_broker_id(), 1);
}

#[test]
fn minimum_broker_id_handles_zero_and_large_ids() {
let mut group = BrokerMemberGroup::new(
CheetahString::from("cluster"),
CheetahString::from("broker"),
);
group
.broker_addrs
.insert(0, CheetahString::from("127.0.0.1:10910"));
group
.broker_addrs
.insert(42, CheetahString::from("127.0.0.1:10942"));
group
.broker_addrs
.insert(u64::MAX, CheetahString::from("127.0.0.1:12000"));

assert_eq!(group.minimum_broker_id(), 0);
}
}
Loading