diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index 1db54fa2..a6994b9b 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -1455,7 +1455,7 @@ impl BrokerRuntimeInner { } } if let Some(slave_synchronize) = &mut self.slave_synchronize { - slave_synchronize.set_master_addr(Some(result.master_addr)); + slave_synchronize.set_master_addr(Some(&result.master_addr)); } if check_order_config { if let Some(topic_config_manager) = &mut self.topic_config_manager { @@ -2063,10 +2063,25 @@ impl BrokerRuntimeInner { self.slave_synchronize.as_ref() } + #[inline] + pub fn slave_synchronize_unchecked(&self) -> &SlaveSynchronize { + unsafe { self.slave_synchronize.as_ref().unwrap_unchecked() } + } + + #[inline] + pub fn slave_synchronize_mut(&mut self) -> Option<&mut SlaveSynchronize> { + self.slave_synchronize.as_mut() + } + + #[inline] + pub fn slave_synchronize_mut_unchecked(&mut self) -> &mut SlaveSynchronize { + unsafe { self.slave_synchronize.as_mut().unwrap_unchecked() } + } + #[inline] pub fn update_slave_master_addr(&mut self, master_addr: Option) { if let Some(ref mut slave) = self.slave_synchronize { - slave.set_master_addr(master_addr); + slave.set_master_addr(master_addr.as_ref()); }; } @@ -2702,7 +2717,16 @@ impl BrokerRuntimeInner { } async fn on_master_offline(&mut self) { - error!("unimplemented") + let slave_synchronize = self.slave_synchronize_unchecked(); + if let Some(master_addr) = slave_synchronize.master_addr() { + if !master_addr.is_empty() { + //close channels + } + } + self.slave_synchronize_mut_unchecked().set_master_addr(None); + self.message_store_unchecked_mut() + .update_ha_master_address("") + .await } async fn send_heartbeat(&self) { diff --git a/rocketmq-broker/src/slave/slave_synchronize.rs b/rocketmq-broker/src/slave/slave_synchronize.rs index 673c3a8e..82309e8b 100644 --- a/rocketmq-broker/src/slave/slave_synchronize.rs +++ b/rocketmq-broker/src/slave/slave_synchronize.rs @@ -47,8 +47,8 @@ where self.master_addr.as_ref() } - pub fn set_master_addr(&mut self, addr: Option>) { - let addr = addr.map(|addr| addr.into()); + pub fn set_master_addr(&mut self, addr: Option<&CheetahString>) { + let addr = addr.cloned(); if self.master_addr == addr { return; }