diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index 28a78abc9..c883d2a2c 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -29,6 +29,7 @@ use rocketmq_common::common::config::TopicConfig; use rocketmq_common::common::config_manager::ConfigManager; use rocketmq_common::common::constant::PermName; use rocketmq_common::common::mix_all; +use rocketmq_common::common::mix_all::MASTER_ID; use rocketmq_common::common::server::config::ServerConfig; use rocketmq_common::common::statistics::state_getter::StateGetter; use rocketmq_common::TimeUtils::get_current_millis; @@ -55,6 +56,7 @@ use rocketmq_store::message_store::local_file_message_store::LocalFileMessageSto use rocketmq_store::stats::broker_stats::BrokerStats; use rocketmq_store::stats::broker_stats_manager::BrokerStatsManager; use rocketmq_store::timer::timer_message_store::TimerMessageStore; +use tokio::sync::Mutex; use tracing::error; use tracing::info; use tracing::warn; @@ -239,6 +241,8 @@ impl BrokerRuntime { slave_synchronize: None, last_sync_time_ms: AtomicU64::new(get_current_millis()), broker_pre_online_service: None, + min_broker_id_in_group: AtomicU64::new(0), + min_broker_addr_in_group: Default::default(), }); let mut stats_manager = BrokerStatsManager::new(inner.broker_config.clone()); stats_manager.set_producer_state_getter(Arc::new(ProducerStateGetter { @@ -1610,6 +1614,8 @@ pub(crate) struct BrokerRuntimeInner { slave_synchronize: Option>, last_sync_time_ms: AtomicU64, broker_pre_online_service: Option>, + min_broker_id_in_group: AtomicU64, + min_broker_addr_in_group: Mutex>, } impl BrokerRuntimeInner { @@ -2467,7 +2473,79 @@ impl BrokerRuntimeInner { } async fn sync_broker_member_group(&self) { - warn!("sync_broker_member_group not implemented"); + let broker_cluster_name = &self.broker_config.broker_identity.broker_cluster_name; + let broker_name = &self.broker_config.broker_identity.broker_name; + let compatible_with_old_name_srv = self.broker_config.compatible_with_old_name_srv; + let broker_member_group = self + .broker_outer_api + .sync_broker_member_group( + broker_cluster_name, + broker_name, + compatible_with_old_name_srv, + ) + .await; + + if let Err(ref e) = broker_member_group { + error!("syncBrokerMemberGroup from namesrv failed, error={}", e); + return; + } + let broker_member_group = broker_member_group.unwrap(); + if broker_member_group.is_none() + || broker_member_group + .as_ref() + .unwrap() + .broker_addrs + .is_empty() + { + warn!( + "Couldn't find any broker member from namesrv in {}/{}", + broker_cluster_name, broker_name + ); + return; + } + fn calc_alive_broker_num_in_group( + broker_addr_table: &HashMap< + u64, /* brokerId */ + CheetahString, /* broker address */ + >, + broker_id: u64, + ) -> usize { + if broker_addr_table.contains_key(&broker_id) { + broker_addr_table.len() + } else { + broker_addr_table.len() + 1 + } + } + let broker_member_group = broker_member_group.unwrap(); + self.message_store_unchecked() + .set_alive_replica_num_in_group(calc_alive_broker_num_in_group( + &broker_member_group.broker_addrs, + self.broker_config.broker_identity.broker_id, + ) as i32); + if !self.is_isolated.load(Ordering::Acquire) { + let min_broker_id = broker_member_group.minimum_broker_id(); + let min_broker_addr = broker_member_group + .broker_addrs + .get(&min_broker_id) + .cloned() + .unwrap_or_default(); + self.update_min_broker(min_broker_id, min_broker_addr).await; + } + } + + pub async fn update_min_broker(&self, min_broker_id: u64, min_broker_addr: CheetahString) { + if self.broker_config.enable_slave_acting_master + && self.broker_config.broker_identity.broker_id != MASTER_ID + { + let min_broker_id_in_group = self.min_broker_id_in_group.load(Ordering::SeqCst); + if min_broker_id != min_broker_id_in_group { + let mut offline_broker_addr = None; + if min_broker_id > min_broker_id_in_group { + offline_broker_addr = self.min_broker_addr_in_group.lock().await.clone(); + } + self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None) + } + } } pub fn pop_message_processor_unchecked(&self) -> &ArcMut> { @@ -2568,8 +2646,8 @@ impl BrokerRuntimeInner { } fn on_min_broker_change( - &mut self, - _min_broker_id: i64, + &self, + _min_broker_id: u64, _min_broker_addr: CheetahString, _offline_broker_addr: Option, _master_ha_addr: Option, diff --git a/rocketmq-remoting/src/protocol/body/broker_body/broker_member_group.rs b/rocketmq-remoting/src/protocol/body/broker_body/broker_member_group.rs index 35d9edcbf..b0c464375 100644 --- a/rocketmq-remoting/src/protocol/body/broker_body/broker_member_group.rs +++ b/rocketmq-remoting/src/protocol/body/broker_body/broker_member_group.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use cheetah_string::CheetahString; +use rocketmq_common::common::mix_all::MASTER_ID; use serde::Deserialize; use serde::Serialize; @@ -36,6 +37,14 @@ impl BrokerMemberGroup { broker_addrs: HashMap::new(), } } + + pub fn minimum_broker_id(&self) -> u64 { + if self.broker_addrs.is_empty() { + MASTER_ID + } else { + *self.broker_addrs.keys().min().unwrap() + } + } } #[derive(Debug, Clone, Serialize, Deserialize, Default)] @@ -126,4 +135,42 @@ mod tests { assert_eq!(group.broker_name, CheetahString::from("test_broker")); assert!(group.broker_addrs.is_empty()); } + + #[test] + fn minimum_broker_id_returns_smallest_broker_id_when_present() { + let mut group = BrokerMemberGroup::new( + CheetahString::from("cluster"), + CheetahString::from("broker"), + ); + group + .broker_addrs + .insert(3, CheetahString::from("127.0.0.1:10913")); + group + .broker_addrs + .insert(1, CheetahString::from("127.0.0.1:10911")); + group + .broker_addrs + .insert(2, CheetahString::from("127.0.0.1:10912")); + + assert_eq!(group.minimum_broker_id(), 1); + } + + #[test] + fn minimum_broker_id_handles_zero_and_large_ids() { + let mut group = BrokerMemberGroup::new( + CheetahString::from("cluster"), + CheetahString::from("broker"), + ); + group + .broker_addrs + .insert(0, CheetahString::from("127.0.0.1:10910")); + group + .broker_addrs + .insert(42, CheetahString::from("127.0.0.1:10942")); + group + .broker_addrs + .insert(u64::MAX, CheetahString::from("127.0.0.1:12000")); + + assert_eq!(group.minimum_broker_id(), 0); + } }