From 53dfa0fcaffe529875c64246c02b397f4cbb809c Mon Sep 17 00:00:00 2001 From: mxsm Date: Tue, 16 Sep 2025 06:43:54 +0000 Subject: [PATCH 1/2] =?UTF-8?q?[ISSUE=20#4050]=E2=99=BB=EF=B8=8FReplace=20?= =?UTF-8?q?manual=20heartbeat=20scheduling=20with=20a=20fixed-rate=20task?= =?UTF-8?q?=20manager?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rocketmq-broker/src/broker_runtime.rs | 42 ++++++++++----------------- 1 file changed, 15 insertions(+), 27 deletions(-) diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index 6ee8fbbe..b34444fa 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -1190,33 +1190,17 @@ impl BrokerRuntime { if self.inner.broker_config.enable_slave_acting_master { self.schedule_send_heartbeat(); - let broker_runtime_inner = self.inner.clone(); - self.broker_runtime - .as_ref() - .unwrap() - .get_handle() - .spawn(async move { - let period = Duration::from_secs(1); - let initial_delay = Duration::from_millis( - broker_runtime_inner - .broker_config - .sync_broker_member_group_period, - ); - tokio::time::sleep(initial_delay).await; - loop { - // record current execution time - let current_execution_time = tokio::time::Instant::now(); - // execute task - broker_runtime_inner.sync_broker_member_group(); - // Calculate the time of the next execution - let next_execution_time = current_execution_time + period; - - // Wait until the next execution - let delay = next_execution_time - .saturating_duration_since(tokio::time::Instant::now()); - tokio::time::sleep(delay).await; - } - }); + let sync_broker_member_group_period = + self.inner.broker_config.sync_broker_member_group_period; + let inner_ = self.inner.clone(); + self.scheduled_task_manager.add_fixed_rate_task_async( + Duration::from_millis(1000), + Duration::from_millis(sync_broker_member_group_period), + async move |_ctx| { + inner_.sync_broker_member_group().await; + Ok(()) + }, + ); } if self.inner.broker_config.enable_controller_mode { @@ -1270,6 +1254,10 @@ impl BrokerRuntime { Duration::from_millis(1000), Duration::from_millis(broker_heartbeat_interval), async move |_ctx| { + if inner_.is_isolated.load(Ordering::Acquire) { + info!("Skip send heartbeat for broker is isolated"); + return Ok(()); + } inner_.send_heartbeat().await; Ok(()) }, From 562080a08e56472a367ed2d499062557597f9827 Mon Sep 17 00:00:00 2001 From: mxsm Date: Tue, 16 Sep 2025 06:52:19 +0000 Subject: [PATCH 2/2] fix code compile error --- rocketmq-broker/src/broker_runtime.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index b34444fa..28a78abc 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -2466,7 +2466,7 @@ impl BrokerRuntimeInner { CheetahString::from_string(addr) } - pub fn sync_broker_member_group(&self) { + async fn sync_broker_member_group(&self) { warn!("sync_broker_member_group not implemented"); }