-
Notifications
You must be signed in to change notification settings - Fork 174
[ISSUE #4052]🚀Implement minimum broker ID and address tracking in broker runtime #4053
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -29,6 +29,7 @@ use rocketmq_common::common::config::TopicConfig; | |||||||||
use rocketmq_common::common::config_manager::ConfigManager; | ||||||||||
use rocketmq_common::common::constant::PermName; | ||||||||||
use rocketmq_common::common::mix_all; | ||||||||||
use rocketmq_common::common::mix_all::MASTER_ID; | ||||||||||
use rocketmq_common::common::server::config::ServerConfig; | ||||||||||
use rocketmq_common::common::statistics::state_getter::StateGetter; | ||||||||||
use rocketmq_common::TimeUtils::get_current_millis; | ||||||||||
|
@@ -55,6 +56,7 @@ use rocketmq_store::message_store::local_file_message_store::LocalFileMessageSto | |||||||||
use rocketmq_store::stats::broker_stats::BrokerStats; | ||||||||||
use rocketmq_store::stats::broker_stats_manager::BrokerStatsManager; | ||||||||||
use rocketmq_store::timer::timer_message_store::TimerMessageStore; | ||||||||||
use tokio::sync::Mutex; | ||||||||||
use tracing::error; | ||||||||||
use tracing::info; | ||||||||||
use tracing::warn; | ||||||||||
|
@@ -239,6 +241,8 @@ impl BrokerRuntime { | |||||||||
slave_synchronize: None, | ||||||||||
last_sync_time_ms: AtomicU64::new(get_current_millis()), | ||||||||||
broker_pre_online_service: None, | ||||||||||
min_broker_id_in_group: AtomicU64::new(0), | ||||||||||
min_broker_addr_in_group: Default::default(), | ||||||||||
}); | ||||||||||
let mut stats_manager = BrokerStatsManager::new(inner.broker_config.clone()); | ||||||||||
stats_manager.set_producer_state_getter(Arc::new(ProducerStateGetter { | ||||||||||
|
@@ -1610,6 +1614,8 @@ pub(crate) struct BrokerRuntimeInner<MS: MessageStore> { | |||||||||
slave_synchronize: Option<SlaveSynchronize<MS>>, | ||||||||||
last_sync_time_ms: AtomicU64, | ||||||||||
broker_pre_online_service: Option<BrokerPreOnlineService<MS>>, | ||||||||||
min_broker_id_in_group: AtomicU64, | ||||||||||
min_broker_addr_in_group: Mutex<Option<CheetahString>>, | ||||||||||
} | ||||||||||
|
||||||||||
impl<MS: MessageStore> BrokerRuntimeInner<MS> { | ||||||||||
|
@@ -2467,7 +2473,79 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> { | |||||||||
} | ||||||||||
|
||||||||||
async fn sync_broker_member_group(&self) { | ||||||||||
warn!("sync_broker_member_group not implemented"); | ||||||||||
let broker_cluster_name = &self.broker_config.broker_identity.broker_cluster_name; | ||||||||||
let broker_name = &self.broker_config.broker_identity.broker_name; | ||||||||||
let compatible_with_old_name_srv = self.broker_config.compatible_with_old_name_srv; | ||||||||||
let broker_member_group = self | ||||||||||
.broker_outer_api | ||||||||||
.sync_broker_member_group( | ||||||||||
broker_cluster_name, | ||||||||||
broker_name, | ||||||||||
compatible_with_old_name_srv, | ||||||||||
) | ||||||||||
.await; | ||||||||||
|
||||||||||
if let Err(ref e) = broker_member_group { | ||||||||||
error!("syncBrokerMemberGroup from namesrv failed, error={}", e); | ||||||||||
return; | ||||||||||
} | ||||||||||
let broker_member_group = broker_member_group.unwrap(); | ||||||||||
if broker_member_group.is_none() | ||||||||||
|| broker_member_group | ||||||||||
.as_ref() | ||||||||||
.unwrap() | ||||||||||
.broker_addrs | ||||||||||
.is_empty() | ||||||||||
{ | ||||||||||
warn!( | ||||||||||
"Couldn't find any broker member from namesrv in {}/{}", | ||||||||||
broker_cluster_name, broker_name | ||||||||||
); | ||||||||||
return; | ||||||||||
} | ||||||||||
fn calc_alive_broker_num_in_group( | ||||||||||
broker_addr_table: &HashMap< | ||||||||||
u64, /* brokerId */ | ||||||||||
CheetahString, /* broker address */ | ||||||||||
>, | ||||||||||
broker_id: u64, | ||||||||||
) -> usize { | ||||||||||
if broker_addr_table.contains_key(&broker_id) { | ||||||||||
broker_addr_table.len() | ||||||||||
} else { | ||||||||||
broker_addr_table.len() + 1 | ||||||||||
} | ||||||||||
} | ||||||||||
let broker_member_group = broker_member_group.unwrap(); | ||||||||||
self.message_store_unchecked() | ||||||||||
.set_alive_replica_num_in_group(calc_alive_broker_num_in_group( | ||||||||||
&broker_member_group.broker_addrs, | ||||||||||
self.broker_config.broker_identity.broker_id, | ||||||||||
) as i32); | ||||||||||
if !self.is_isolated.load(Ordering::Acquire) { | ||||||||||
let min_broker_id = broker_member_group.minimum_broker_id(); | ||||||||||
let min_broker_addr = broker_member_group | ||||||||||
.broker_addrs | ||||||||||
.get(&min_broker_id) | ||||||||||
.cloned() | ||||||||||
.unwrap_or_default(); | ||||||||||
self.update_min_broker(min_broker_id, min_broker_addr).await; | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
pub async fn update_min_broker(&self, min_broker_id: u64, min_broker_addr: CheetahString) { | ||||||||||
if self.broker_config.enable_slave_acting_master | ||||||||||
&& self.broker_config.broker_identity.broker_id != MASTER_ID | ||||||||||
{ | ||||||||||
let min_broker_id_in_group = self.min_broker_id_in_group.load(Ordering::SeqCst); | ||||||||||
if min_broker_id != min_broker_id_in_group { | ||||||||||
let mut offline_broker_addr = None; | ||||||||||
if min_broker_id > min_broker_id_in_group { | ||||||||||
offline_broker_addr = self.min_broker_addr_in_group.lock().await.clone(); | ||||||||||
} | ||||||||||
self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The minimum broker ID and address are updated in memory but never stored back to the atomic fields. After calling
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||||||
} | ||||||||||
} | ||||||||||
} | ||||||||||
Comment on lines
+2536
to
2549
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chainBug: state not updated; repeated events; and potential panic downstream update_min_broker never stores the new min ID or address. This causes repeated change detections and breaks offline detection. Fix by storing atomically and updating the address under the mutex before invoking the callback. Use Acquire/Release rather than SeqCst here. Apply this diff: - pub async fn update_min_broker(&self, min_broker_id: u64, min_broker_addr: CheetahString) {
- if self.broker_config.enable_slave_acting_master
- && self.broker_config.broker_identity.broker_id != MASTER_ID
- {
- let min_broker_id_in_group = self.min_broker_id_in_group.load(Ordering::SeqCst);
- if min_broker_id != min_broker_id_in_group {
- let mut offline_broker_addr = None;
- if min_broker_id > min_broker_id_in_group {
- offline_broker_addr = self.min_broker_addr_in_group.lock().await.clone();
- }
- self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None)
- }
- }
- }
+ pub async fn update_min_broker(&self, min_broker_id: u64, min_broker_addr: CheetahString) {
+ if !(self.broker_config.enable_slave_acting_master
+ && self.broker_config.broker_identity.broker_id != MASTER_ID)
+ {
+ return;
+ }
+ let prev_id = self.min_broker_id_in_group.load(Ordering::Acquire);
+ let mut offline_broker_addr = None;
+ {
+ let mut guard = self.min_broker_addr_in_group.lock().await;
+ if min_broker_id > prev_id {
+ offline_broker_addr = guard.clone();
+ }
+ *guard = Some(min_broker_addr.clone());
+ }
+ if min_broker_id != prev_id {
+ self.min_broker_id_in_group
+ .store(min_broker_id, Ordering::Release);
+ self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None);
+ }
+ } Additionally, get_min_broker_id_in_group currently returns the local broker_id, not the tracked minimum. Please update it accordingly (outside this hunk): pub fn get_min_broker_id_in_group(&self) -> u64 {
self.min_broker_id_in_group.load(Ordering::Acquire)
} 🏁 Script executed: #!/bin/bash
# Verify call sites and ensure semantics rely on the tracked min-id, not local broker_id.
rg -n -C2 'get_min_broker_id_in_group\('
rg -n -C2 'update_min_broker\s*\('
rg -n -C2 'on_min_broker_change\s*\(' Length of output: 8218 Critical: persist min-broker state atomically before invoking on_min_broker_change
Locations: rocketmq-broker/src/broker_runtime.rs (update_min_broker ≈ lines 2536–2549; get_min_broker_id_in_group ≈ lines 2251–2253). Callers: processor/admin_broker_processor/notify_min_broker_id_handler.rs and processor/default_pull_message_result_handler.rs. Apply this diff: - pub async fn update_min_broker(&self, min_broker_id: u64, min_broker_addr: CheetahString) {
- if self.broker_config.enable_slave_acting_master
- && self.broker_config.broker_identity.broker_id != MASTER_ID
- {
- let min_broker_id_in_group = self.min_broker_id_in_group.load(Ordering::SeqCst);
- if min_broker_id != min_broker_id_in_group {
- let mut offline_broker_addr = None;
- if min_broker_id > min_broker_id_in_group {
- offline_broker_addr = self.min_broker_addr_in_group.lock().await.clone();
- }
- self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None)
- }
- }
- }
+ pub async fn update_min_broker(&self, min_broker_id: u64, min_broker_addr: CheetahString) {
+ if !(self.broker_config.enable_slave_acting_master
+ && self.broker_config.broker_identity.broker_id != MASTER_ID)
+ {
+ return;
+ }
+ let prev_id = self.min_broker_id_in_group.load(Ordering::Acquire);
+ let mut offline_broker_addr = None;
+ {
+ let mut guard = self.min_broker_addr_in_group.lock().await;
+ if min_broker_id > prev_id {
+ offline_broker_addr = guard.clone();
+ }
+ *guard = Some(min_broker_addr.clone());
+ }
+ if min_broker_id != prev_id {
+ self.min_broker_id_in_group
+ .store(min_broker_id, Ordering::Release);
+ self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None);
+ }
+ } Additionally, update get_min_broker_id_in_group to: pub fn get_min_broker_id_in_group(&self) -> u64 {
self.min_broker_id_in_group.load(Ordering::Acquire)
} 🤖 Prompt for AI Agents
|
||||||||||
|
||||||||||
pub fn pop_message_processor_unchecked(&self) -> &ArcMut<PopMessageProcessor<MS>> { | ||||||||||
|
@@ -2568,8 +2646,8 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> { | |||||||||
} | ||||||||||
|
||||||||||
fn on_min_broker_change( | ||||||||||
&mut self, | ||||||||||
_min_broker_id: i64, | ||||||||||
&self, | ||||||||||
_min_broker_id: u64, | ||||||||||
_min_broker_addr: CheetahString, | ||||||||||
_offline_broker_addr: Option<CheetahString>, | ||||||||||
_master_ha_addr: Option<CheetahString>, | ||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing
.await
keyword for the async method call. Theon_min_broker_change
method appears to be async based on the context, but it's being called without awaiting the result.Copilot uses AI. Check for mistakes.