From 8d9e5e2d3eb8ddd5b99ccb8b8513b55c6b41276a Mon Sep 17 00:00:00 2001 From: jon Date: Mon, 8 Sep 2025 16:00:44 +0800 Subject: [PATCH 01/19] add notify min broker id change --- .../notify_min_broker_id_handler.rs | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs diff --git a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs new file mode 100644 index 00000000..901a3bb8 --- /dev/null +++ b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use rocketmq_remoting::code::request_code::RequestCode; +use rocketmq_remoting::code::response_code::ResponseCode; +use rocketmq_remoting::net::channel::Channel; +use rocketmq_remoting::protocol::header::namesrv::brokerid_change_request_header::NotifyMinBrokerIdChangeRequestHeader; +use rocketmq_remoting::protocol::remoting_command::RemotingCommand; +use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext; +use rocketmq_rust::ArcMut; +use rocketmq_store::base::message_store::MessageStore; +use tracing::warn; + +use crate::broker_runtime::BrokerRuntimeInner; + +#[derive(Clone)] +pub struct NotifyMinBrokerChangeIdHandler { + broker_runtime_inner: ArcMut>, +} + +impl NotifyMinBrokerChangeIdHandler { + pub fn new(broker_runtime_inner: ArcMut>) -> Self { + Self { + broker_runtime_inner, + } + } + + pub async fn notify_min_broker_id_change( + &mut self, + _channel: Channel, + _ctx: ConnectionHandlerContext, + _request_code: RequestCode, + request: RemotingCommand, + ) -> Option { + let change_header = request + .decode_command_custom_header::() + .unwrap(); + + let current_broker_id = self + .broker_runtime_inner + .broker_config() + .broker_identity + .broker_id; + + warn!( + "min broker id changed, prev {}, new {}", + current_broker_id, + change_header + .min_broker_id + .expect("min broker id not must be present") + ); + + // TODO Implement update broker id method in the near future + + let mut response = RemotingCommand::default(); + response.set_code_ref(ResponseCode::Success); + Some(response) + } +} From f2f5836ece6ae42ca81e9b86c1f2a9dd3317bd54 Mon Sep 17 00:00:00 2001 From: jon Date: Mon, 8 Sep 2025 16:01:16 +0800 Subject: [PATCH 02/19] add notify min broker id change --- .../src/processor/admin_broker_processor.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/rocketmq-broker/src/processor/admin_broker_processor.rs b/rocketmq-broker/src/processor/admin_broker_processor.rs index d78fe454..6ea36bce 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor.rs @@ -29,6 +29,7 @@ use crate::broker_runtime::BrokerRuntimeInner; use crate::processor::admin_broker_processor::batch_mq_handler::BatchMqHandler; use crate::processor::admin_broker_processor::broker_config_request_handler::BrokerConfigRequestHandler; use crate::processor::admin_broker_processor::consumer_request_handler::ConsumerRequestHandler; +use crate::processor::admin_broker_processor::notify_min_broker_id_handler::NotifyMinBrokerChangeIdHandler; use crate::processor::admin_broker_processor::offset_request_handler::OffsetRequestHandler; use crate::processor::admin_broker_processor::subscription_group_handler::SubscriptionGroupHandler; use crate::processor::admin_broker_processor::topic_request_handler::TopicRequestHandler; @@ -36,6 +37,7 @@ use crate::processor::admin_broker_processor::topic_request_handler::TopicReques mod batch_mq_handler; mod broker_config_request_handler; mod consumer_request_handler; +mod notify_min_broker_id_handler; mod offset_request_handler; mod subscription_group_handler; mod topic_request_handler; @@ -49,6 +51,8 @@ pub struct AdminBrokerProcessor { subscription_group_handler: SubscriptionGroupHandler, broker_runtime_inner: ArcMut>, + + notify_min_broker_handler: NotifyMinBrokerChangeIdHandler, } impl RequestProcessor for AdminBrokerProcessor @@ -78,6 +82,9 @@ impl AdminBrokerProcessor { let batch_mq_handler = BatchMqHandler::new(broker_runtime_inner.clone()); let subscription_group_handler = SubscriptionGroupHandler::new(broker_runtime_inner.clone()); + + let notify_min_broker_handler = + NotifyMinBrokerChangeIdHandler::new(broker_runtime_inner.clone()); AdminBrokerProcessor { topic_request_handler, broker_config_request_handler, @@ -86,6 +93,7 @@ impl AdminBrokerProcessor { batch_mq_handler, subscription_group_handler, broker_runtime_inner, + notify_min_broker_handler, } } } @@ -212,6 +220,11 @@ impl AdminBrokerProcessor { .get_all_subscription_group_config(channel, ctx, request_code, request) .await } + RequestCode::NotifyMinBrokerIdChange => { + self.notify_min_broker_handler + .notify_min_broker_id_change(channel, ctx, request_code, request) + .await + } _ => Some(get_unknown_cmd_response(request_code)), } } From daad8ef3dbf2e41218a66c53cf66e068c1990b9c Mon Sep 17 00:00:00 2001 From: jon Date: Tue, 9 Sep 2025 14:48:14 +0800 Subject: [PATCH 03/19] Broker id Change function external logic implementation --- .../notify_min_broker_id_handler.rs | 56 +++++++++++++++---- 1 file changed, 44 insertions(+), 12 deletions(-) diff --git a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs index 901a3bb8..fc24cf55 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs @@ -15,6 +15,10 @@ * limitations under the License. */ +use std::sync::Arc; +use std::time::Duration; + +use rocketmq_common::common::mix_all::MASTER_ID; use rocketmq_remoting::code::request_code::RequestCode; use rocketmq_remoting::code::response_code::ResponseCode; use rocketmq_remoting::net::channel::Channel; @@ -22,20 +26,23 @@ use rocketmq_remoting::protocol::header::namesrv::brokerid_change_request_header use rocketmq_remoting::protocol::remoting_command::RemotingCommand; use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext; use rocketmq_rust::ArcMut; +use rocketmq_rust::RocketMQTokioMutex; use rocketmq_store::base::message_store::MessageStore; +use tracing::error; use tracing::warn; use crate::broker_runtime::BrokerRuntimeInner; - #[derive(Clone)] pub struct NotifyMinBrokerChangeIdHandler { broker_runtime_inner: ArcMut>, + lock: Arc>, } impl NotifyMinBrokerChangeIdHandler { pub fn new(broker_runtime_inner: ArcMut>) -> Self { Self { - broker_runtime_inner, + broker_runtime_inner: broker_runtime_inner, + lock: Arc::new(RocketMQTokioMutex::new(())), } } @@ -50,24 +57,49 @@ impl NotifyMinBrokerChangeIdHandler { .decode_command_custom_header::() .unwrap(); - let current_broker_id = self - .broker_runtime_inner - .broker_config() - .broker_identity - .broker_id; + let broker_config = self.broker_runtime_inner.broker_config(); + + let latest_broker_id = change_header + .min_broker_id + .expect("min broker id not must be present"); warn!( "min broker id changed, prev {}, new {}", - current_broker_id, - change_header - .min_broker_id - .expect("min broker id not must be present") + broker_config.broker_identity.broker_id, latest_broker_id ); - // TODO Implement update broker id method in the near future + self.update_min_broker(change_header).await; let mut response = RemotingCommand::default(); response.set_code_ref(ResponseCode::Success); Some(response) } + + async fn update_min_broker(&self, change_header: NotifyMinBrokerIdChangeRequestHeader) { + let broker_config = self.broker_runtime_inner.broker_config(); + + if broker_config.enable_slave_acting_master + && broker_config.broker_identity.broker_id != MASTER_ID + { + let lock = self + .lock + .try_lock_timeout(Duration::from_millis(3000)) + .await; + + if let Some(_) = lock { + if let Some(min_broker_id) = change_header.min_broker_id { + if min_broker_id != self.broker_runtime_inner.get_min_broker_id_in_group() { + // on min broker change + self.on_min_broker_change(&change_header); + } + } + } else { + error!("Update min broker failed"); + } + } + } + + fn on_min_broker_change(&self, _change_header: &NotifyMinBrokerIdChangeRequestHeader) { + // data update specific logic + } } From 2547c9b024ecf05046523728614fe7658e957588 Mon Sep 17 00:00:00 2001 From: jon Date: Tue, 9 Sep 2025 18:15:52 +0800 Subject: [PATCH 04/19] Broker id Change function external logic implementation --- .../notify_min_broker_id_handler.rs | 33 ++++++++++++++++--- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs index fc24cf55..d1bf70a0 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs @@ -15,9 +15,11 @@ * limitations under the License. */ +use std::ops::Deref; use std::sync::Arc; use std::time::Duration; +use cheetah_string::CheetahString; use rocketmq_common::common::mix_all::MASTER_ID; use rocketmq_remoting::code::request_code::RequestCode; use rocketmq_remoting::code::response_code::ResponseCode; @@ -29,6 +31,7 @@ use rocketmq_rust::ArcMut; use rocketmq_rust::RocketMQTokioMutex; use rocketmq_store::base::message_store::MessageStore; use tracing::error; +use tracing::info; use tracing::warn; use crate::broker_runtime::BrokerRuntimeInner; @@ -36,6 +39,8 @@ use crate::broker_runtime::BrokerRuntimeInner; pub struct NotifyMinBrokerChangeIdHandler { broker_runtime_inner: ArcMut>, lock: Arc>, + min_broker_id_in_group: Arc>, + min_broker_addr_in_group: Arc, } impl NotifyMinBrokerChangeIdHandler { @@ -43,6 +48,8 @@ impl NotifyMinBrokerChangeIdHandler { Self { broker_runtime_inner: broker_runtime_inner, lock: Arc::new(RocketMQTokioMutex::new(())), + min_broker_id_in_group: Arc::new(Some(MASTER_ID)), + min_broker_addr_in_group: Arc::new(CheetahString::empty()), } } @@ -75,21 +82,36 @@ impl NotifyMinBrokerChangeIdHandler { Some(response) } - async fn update_min_broker(&self, change_header: NotifyMinBrokerIdChangeRequestHeader) { + async fn update_min_broker(&mut self, change_header: NotifyMinBrokerIdChangeRequestHeader) { let broker_config = self.broker_runtime_inner.broker_config(); if broker_config.enable_slave_acting_master && broker_config.broker_identity.broker_id != MASTER_ID { - let lock = self + if let Some(_) = self .lock .try_lock_timeout(Duration::from_millis(3000)) - .await; - - if let Some(_) = lock { + .await + { if let Some(min_broker_id) = change_header.min_broker_id { if min_broker_id != self.broker_runtime_inner.get_min_broker_id_in_group() { // on min broker change + // + let latest_broker_id = change_header.min_broker_id.unwrap(); + let latest_broker_addr = change_header.min_broker_addr.as_deref().unwrap(); + + info!( + "Min broker changed, old: {}-{}, new {}-{}", + self.broker_runtime_inner.get_min_broker_id_in_group(), + self.broker_runtime_inner.get_broker_addr(), + latest_broker_id, + latest_broker_addr + ); + + self.min_broker_id_in_group = Arc::new(Some(latest_broker_id)); + self.min_broker_addr_in_group = + Arc::new(CheetahString::from_string(latest_broker_addr.to_string())); + self.on_min_broker_change(&change_header); } } @@ -100,6 +122,7 @@ impl NotifyMinBrokerChangeIdHandler { } fn on_min_broker_change(&self, _change_header: &NotifyMinBrokerIdChangeRequestHeader) { + // data update specific logic } } From 79344eac1e7253476c98e94630c063067d0f24e6 Mon Sep 17 00:00:00 2001 From: jon Date: Wed, 10 Sep 2025 21:45:12 +0800 Subject: [PATCH 05/19] Broker id Change function external logic implementation --- .../notify_min_broker_id_handler.rs | 111 +++++++++++++----- 1 file changed, 83 insertions(+), 28 deletions(-) diff --git a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs index d1bf70a0..821ec0e8 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs @@ -16,6 +16,7 @@ */ use std::ops::Deref; +use std::ops::DerefMut; use std::sync::Arc; use std::time::Duration; @@ -28,28 +29,41 @@ use rocketmq_remoting::protocol::header::namesrv::brokerid_change_request_header use rocketmq_remoting::protocol::remoting_command::RemotingCommand; use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext; use rocketmq_rust::ArcMut; -use rocketmq_rust::RocketMQTokioMutex; +use rocketmq_rust::RocketMQTokioRwLock; use rocketmq_store::base::message_store::MessageStore; use tracing::error; use tracing::info; use tracing::warn; use crate::broker_runtime::BrokerRuntimeInner; +use crate::slave::slave_synchronize::SlaveSynchronize; + #[derive(Clone)] pub struct NotifyMinBrokerChangeIdHandler { broker_runtime_inner: ArcMut>, - lock: Arc>, - min_broker_id_in_group: Arc>, - min_broker_addr_in_group: Arc, + lock: Arc>, +} + +#[derive(Clone)] +struct MinBrokerIngroup { + min_broker_id_in_group: Option, + min_broker_addr_in_group: CheetahString, +} + +impl MinBrokerIngroup { + fn new() -> Self { + Self { + min_broker_id_in_group: None, + min_broker_addr_in_group: CheetahString::empty(), + } + } } impl NotifyMinBrokerChangeIdHandler { pub fn new(broker_runtime_inner: ArcMut>) -> Self { Self { - broker_runtime_inner: broker_runtime_inner, - lock: Arc::new(RocketMQTokioMutex::new(())), - min_broker_id_in_group: Arc::new(Some(MASTER_ID)), - min_broker_addr_in_group: Arc::new(CheetahString::empty()), + broker_runtime_inner, + lock: Arc::new(RocketMQTokioRwLock::new(MinBrokerIngroup::new())), } } @@ -90,29 +104,13 @@ impl NotifyMinBrokerChangeIdHandler { { if let Some(_) = self .lock - .try_lock_timeout(Duration::from_millis(3000)) + .try_write_timeout(Duration::from_millis(3000)) .await { if let Some(min_broker_id) = change_header.min_broker_id { if min_broker_id != self.broker_runtime_inner.get_min_broker_id_in_group() { // on min broker change - // - let latest_broker_id = change_header.min_broker_id.unwrap(); - let latest_broker_addr = change_header.min_broker_addr.as_deref().unwrap(); - - info!( - "Min broker changed, old: {}-{}, new {}-{}", - self.broker_runtime_inner.get_min_broker_id_in_group(), - self.broker_runtime_inner.get_broker_addr(), - latest_broker_id, - latest_broker_addr - ); - - self.min_broker_id_in_group = Arc::new(Some(latest_broker_id)); - self.min_broker_addr_in_group = - Arc::new(CheetahString::from_string(latest_broker_addr.to_string())); - - self.on_min_broker_change(&change_header); + self.on_min_broker_change(&change_header).await; } } } else { @@ -121,8 +119,65 @@ impl NotifyMinBrokerChangeIdHandler { } } - fn on_min_broker_change(&self, _change_header: &NotifyMinBrokerIdChangeRequestHeader) { + async fn on_min_broker_change(&self, change_header: &NotifyMinBrokerIdChangeRequestHeader) { + let min_broker_id = change_header.min_broker_id.unwrap(); + let min_broker_addr = change_header.min_broker_addr.as_deref().unwrap(); + + info!( + "Min broker changed, old: {}-{}, new {}-{}", + self.broker_runtime_inner.get_min_broker_id_in_group(), + self.broker_runtime_inner.get_broker_addr(), + min_broker_id, + min_broker_addr + ); + + self.lock.write().await.min_broker_id_in_group = Some(min_broker_id); + self.lock.write().await.min_broker_addr_in_group = + CheetahString::from_slice(min_broker_addr); + + let should_start = self.broker_runtime_inner.get_min_broker_id_in_group() + == self.lock.read().await.min_broker_id_in_group.unwrap(); + + self.change_special_service_status(should_start).await; + + // master offline + if let Some(offline_broker_addr) = change_header.offline_broker_addr.as_deref() { + if let Some(slave_sync) = self.broker_runtime_inner.slave_synchronize() { + match slave_sync.master_addr() { + Some(master_addr) => { + if offline_broker_addr.eq(master_addr.deref()) { + self.on_master_offline().await; + } + } + + None => {} + } + } + } + + //master online + if min_broker_id == MASTER_ID { + self.on_master_on_line().await; + } + } + + async fn change_special_service_status(&self, _should_start: bool) {} + + async fn on_master_offline(&self) { + let broker_runtime_inner = self.broker_runtime_inner.clone(); + + let _slave_synchronize = SlaveSynchronize::new(broker_runtime_inner); + + self.lock.write().await.min_broker_id_in_group = Some(123); + } + + async fn on_master_on_line(&self) {} + + pub async fn get_min_broker_id_in_group(&self) -> Option { + self.lock.read().await.min_broker_id_in_group + } - // data update specific logic + pub async fn get_min_broker_addr_in_group(&self) -> CheetahString { + self.lock.read().await.min_broker_addr_in_group.clone() } } From 1471dd5fbb03beb98b2057f2759c148e77d0621a Mon Sep 17 00:00:00 2001 From: jon Date: Wed, 10 Sep 2025 23:52:41 +0800 Subject: [PATCH 06/19] Implementation Notification Broker Id Change Logic --- rocketmq-broker/src/broker_runtime.rs | 12 +++ .../notify_min_broker_id_handler.rs | 92 +++++++++++++++---- 2 files changed, 88 insertions(+), 16 deletions(-) diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index 04f110ca..fa2aaf16 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -2057,6 +2057,18 @@ impl BrokerRuntimeInner { &self.pop_inflight_message_counter } + #[inline] + pub fn slave_synchronize(&self) -> &Option> { + &self.slave_synchronize + } + + #[inline] + pub fn set_master_addr(&mut self, master_addr: CheetahString) { + if let Some(ref mut slave) = self.slave_synchronize { + slave.set_master_addr(master_addr); + }; + } + #[inline] pub fn set_store_host(&mut self, store_host: SocketAddr) { self.store_host = store_host; diff --git a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs index 821ec0e8..bae89fcb 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs @@ -16,7 +16,6 @@ */ use std::ops::Deref; -use std::ops::DerefMut; use std::sync::Arc; use std::time::Duration; @@ -36,7 +35,6 @@ use tracing::info; use tracing::warn; use crate::broker_runtime::BrokerRuntimeInner; -use crate::slave::slave_synchronize::SlaveSynchronize; #[derive(Clone)] pub struct NotifyMinBrokerChangeIdHandler { @@ -53,7 +51,7 @@ struct MinBrokerIngroup { impl MinBrokerIngroup { fn new() -> Self { Self { - min_broker_id_in_group: None, + min_broker_id_in_group: Some(MASTER_ID), min_broker_addr_in_group: CheetahString::empty(), } } @@ -110,7 +108,18 @@ impl NotifyMinBrokerChangeIdHandler { if let Some(min_broker_id) = change_header.min_broker_id { if min_broker_id != self.broker_runtime_inner.get_min_broker_id_in_group() { // on min broker change - self.on_min_broker_change(&change_header).await; + let min_broker_id = change_header.min_broker_id.unwrap(); + let min_broker_addr = change_header.min_broker_addr.as_deref().unwrap(); + let offline_broker_addr = &change_header.offline_broker_addr; + let master_ha_addr = &change_header.ha_broker_addr; + + self.on_min_broker_change( + min_broker_id, + min_broker_addr, + offline_broker_addr, + master_ha_addr, + ) + .await; } } } else { @@ -119,10 +128,13 @@ impl NotifyMinBrokerChangeIdHandler { } } - async fn on_min_broker_change(&self, change_header: &NotifyMinBrokerIdChangeRequestHeader) { - let min_broker_id = change_header.min_broker_id.unwrap(); - let min_broker_addr = change_header.min_broker_addr.as_deref().unwrap(); - + async fn on_min_broker_change( + &self, + min_broker_id: u64, + min_broker_addr: &str, + offline_broker_addr: &Option, + master_ha_addr: &Option, + ) { info!( "Min broker changed, old: {}-{}, new {}-{}", self.broker_runtime_inner.get_min_broker_id_in_group(), @@ -141,7 +153,7 @@ impl NotifyMinBrokerChangeIdHandler { self.change_special_service_status(should_start).await; // master offline - if let Some(offline_broker_addr) = change_header.offline_broker_addr.as_deref() { + if let Some(offline_broker_addr) = offline_broker_addr { if let Some(slave_sync) = self.broker_runtime_inner.slave_synchronize() { match slave_sync.master_addr() { Some(master_addr) => { @@ -149,7 +161,6 @@ impl NotifyMinBrokerChangeIdHandler { self.on_master_offline().await; } } - None => {} } } @@ -157,21 +168,70 @@ impl NotifyMinBrokerChangeIdHandler { //master online if min_broker_id == MASTER_ID { - self.on_master_on_line().await; + self.on_master_on_line(min_broker_addr, master_ha_addr) + .await; } } - async fn change_special_service_status(&self, _should_start: bool) {} + async fn change_special_service_status(&self, should_start: bool) { + self.broker_runtime_inner + .mut_from_ref() + .change_special_service_status(should_start) + .await; + } async fn on_master_offline(&self) { - let broker_runtime_inner = self.broker_runtime_inner.clone(); + let broker_runtime_inner = self.broker_runtime_inner.mut_from_ref(); - let _slave_synchronize = SlaveSynchronize::new(broker_runtime_inner); + if let Some(slave_synchronize) = broker_runtime_inner.slave_synchronize() { + match slave_synchronize.master_addr() { + Some(_master_addr) => { + // Call the close client method + } + None => {} + } + } - self.lock.write().await.min_broker_id_in_group = Some(123); + broker_runtime_inner.set_master_addr(CheetahString::empty()); + if let Some(message_store) = broker_runtime_inner.message_store() { + message_store.update_master_address(&CheetahString::empty()); + } } - async fn on_master_on_line(&self) {} + async fn on_master_on_line( + &self, + _min_broker_addr: &str, + master_ha_addr: &Option, + ) { + let need_sync_master_flush_offset = + if let Some(message_store) = self.broker_runtime_inner.message_store() { + if message_store.get_master_flushed_offset() == 0x000 + && self + .broker_runtime_inner + .message_store_config() + .sync_master_flush_offset_when_startup + { + true + } else { + false + } + } else { + false + }; + + if master_ha_addr.is_none() || need_sync_master_flush_offset { + if need_sync_master_flush_offset { + unimplemented!(); + } + + if master_ha_addr.is_none() { + let broker_runtime_inner = self.broker_runtime_inner.mut_from_ref(); + if let Some(_message_store) = broker_runtime_inner.message_store() { + unimplemented!(""); + } + } + } + } pub async fn get_min_broker_id_in_group(&self) -> Option { self.lock.read().await.min_broker_id_in_group From c15c6f365e9f6af44853b8a8ca1982ff2323c18f Mon Sep 17 00:00:00 2001 From: jon Date: Thu, 11 Sep 2025 00:07:24 +0800 Subject: [PATCH 07/19] Implementation Notification Broker Id Change Logic --- .../admin_broker_processor/notify_min_broker_id_handler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs index bae89fcb..ecb730bc 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs @@ -167,7 +167,7 @@ impl NotifyMinBrokerChangeIdHandler { } //master online - if min_broker_id == MASTER_ID { + if min_broker_id == MASTER_ID || !min_broker_addr.is_empty() { self.on_master_on_line(min_broker_addr, master_ha_addr) .await; } From b0f772a4f71a47b1984eee199ab4c2c07e603466 Mon Sep 17 00:00:00 2001 From: jon Date: Thu, 11 Sep 2025 00:17:04 +0800 Subject: [PATCH 08/19] Implementation Notification Broker Id Change Logic --- .../notify_min_broker_id_handler.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs index ecb730bc..d826843b 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs @@ -100,10 +100,11 @@ impl NotifyMinBrokerChangeIdHandler { if broker_config.enable_slave_acting_master && broker_config.broker_identity.broker_id != MASTER_ID { - if let Some(_) = self + if self .lock .try_write_timeout(Duration::from_millis(3000)) .await + .is_some() { if let Some(min_broker_id) = change_header.min_broker_id { if min_broker_id != self.broker_runtime_inner.get_min_broker_id_in_group() { @@ -143,9 +144,9 @@ impl NotifyMinBrokerChangeIdHandler { min_broker_addr ); - self.lock.write().await.min_broker_id_in_group = Some(min_broker_id); - self.lock.write().await.min_broker_addr_in_group = - CheetahString::from_slice(min_broker_addr); + let mut lock_guard = self.lock.write().await; + lock_guard.min_broker_id_in_group = Some(min_broker_id); + lock_guard.min_broker_addr_in_group = CheetahString::from_slice(min_broker_addr); let should_start = self.broker_runtime_inner.get_min_broker_id_in_group() == self.lock.read().await.min_broker_id_in_group.unwrap(); @@ -155,13 +156,10 @@ impl NotifyMinBrokerChangeIdHandler { // master offline if let Some(offline_broker_addr) = offline_broker_addr { if let Some(slave_sync) = self.broker_runtime_inner.slave_synchronize() { - match slave_sync.master_addr() { - Some(master_addr) => { - if offline_broker_addr.eq(master_addr.deref()) { - self.on_master_offline().await; - } + if let Some(master_addr) = slave_sync.master_addr() { + if offline_broker_addr.eq(master_addr.deref()) { + self.on_master_offline().await; } - None => {} } } } From 78514828ba2019fa4e106faecff8409ca18a2d4f Mon Sep 17 00:00:00 2001 From: jon Date: Thu, 11 Sep 2025 00:21:15 +0800 Subject: [PATCH 09/19] Broker id Change function external logic implementation --- .../admin_broker_processor/notify_min_broker_id_handler.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs index d826843b..4480f4b3 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs @@ -203,16 +203,11 @@ impl NotifyMinBrokerChangeIdHandler { ) { let need_sync_master_flush_offset = if let Some(message_store) = self.broker_runtime_inner.message_store() { - if message_store.get_master_flushed_offset() == 0x000 + message_store.get_master_flushed_offset() == 0x000 && self .broker_runtime_inner .message_store_config() .sync_master_flush_offset_when_startup - { - true - } else { - false - } } else { false }; From afdaf8d735308987a2b904bd75710aff167ca60c Mon Sep 17 00:00:00 2001 From: jon Date: Thu, 11 Sep 2025 00:21:40 +0800 Subject: [PATCH 10/19] Broker id Change function external logic implementation --- .../admin_broker_processor/notify_min_broker_id_handler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs index 4480f4b3..2984fceb 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs @@ -203,7 +203,7 @@ impl NotifyMinBrokerChangeIdHandler { ) { let need_sync_master_flush_offset = if let Some(message_store) = self.broker_runtime_inner.message_store() { - message_store.get_master_flushed_offset() == 0x000 + message_store.get_master_flushed_offset() == 0x0000 && self .broker_runtime_inner .message_store_config() From 8a8fce3e9367e882ebd41e0d6627525a87602326 Mon Sep 17 00:00:00 2001 From: jon Date: Thu, 11 Sep 2025 00:30:53 +0800 Subject: [PATCH 11/19] Broker id Change function external logic implementation --- rocketmq-broker/src/broker_runtime.rs | 2 +- .../admin_broker_processor/notify_min_broker_id_handler.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index fa2aaf16..0b79cf4d 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -2063,7 +2063,7 @@ impl BrokerRuntimeInner { } #[inline] - pub fn set_master_addr(&mut self, master_addr: CheetahString) { + pub fn update_slave_master_addr(&mut self, master_addr: CheetahString) { if let Some(ref mut slave) = self.slave_synchronize { slave.set_master_addr(master_addr); }; diff --git a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs index 2984fceb..06db88b8 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs @@ -190,7 +190,7 @@ impl NotifyMinBrokerChangeIdHandler { } } - broker_runtime_inner.set_master_addr(CheetahString::empty()); + broker_runtime_inner.update_slave_master_addr(CheetahString::empty()); if let Some(message_store) = broker_runtime_inner.message_store() { message_store.update_master_address(&CheetahString::empty()); } From 883d01818e0f977cd4d4608b268ecfbd17dc79fd Mon Sep 17 00:00:00 2001 From: jon Date: Thu, 11 Sep 2025 00:43:21 +0800 Subject: [PATCH 12/19] Broker id Change function external logic implementation --- .../admin_broker_processor/notify_min_broker_id_handler.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs index 06db88b8..d9b2fe8a 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs @@ -182,11 +182,8 @@ impl NotifyMinBrokerChangeIdHandler { let broker_runtime_inner = self.broker_runtime_inner.mut_from_ref(); if let Some(slave_synchronize) = broker_runtime_inner.slave_synchronize() { - match slave_synchronize.master_addr() { - Some(_master_addr) => { - // Call the close client method - } - None => {} + if let Some(_master_addr) = slave_synchronize.master_addr() { + // Call the close client method } } From ae123846abb1144b71ec9217d796c620f4681c73 Mon Sep 17 00:00:00 2001 From: jon Date: Sat, 13 Sep 2025 01:46:23 +0800 Subject: [PATCH 13/19] update broker ha info --- .../src/processor/admin_broker_processor.rs | 12 ++ .../notify_min_broker_id_handler.rs | 17 ++- .../update_broker_ha_handler.rs | 109 ++++++++++++++++++ rocketmq-remoting/src/protocol/header.rs | 1 + .../exchange_ha_info_response_header.rs | 52 +++++++++ 5 files changed, 181 insertions(+), 10 deletions(-) create mode 100644 rocketmq-broker/src/processor/admin_broker_processor/update_broker_ha_handler.rs create mode 100644 rocketmq-remoting/src/protocol/header/exchange_ha_info_response_header.rs diff --git a/rocketmq-broker/src/processor/admin_broker_processor.rs b/rocketmq-broker/src/processor/admin_broker_processor.rs index 6ea36bce..f629a26f 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor.rs @@ -33,6 +33,7 @@ use crate::processor::admin_broker_processor::notify_min_broker_id_handler::Noti use crate::processor::admin_broker_processor::offset_request_handler::OffsetRequestHandler; use crate::processor::admin_broker_processor::subscription_group_handler::SubscriptionGroupHandler; use crate::processor::admin_broker_processor::topic_request_handler::TopicRequestHandler; +use crate::processor::admin_broker_processor::update_broker_ha_handler::UpdateBrokerHaHandler; mod batch_mq_handler; mod broker_config_request_handler; @@ -41,6 +42,7 @@ mod notify_min_broker_id_handler; mod offset_request_handler; mod subscription_group_handler; mod topic_request_handler; +mod update_broker_ha_handler; pub struct AdminBrokerProcessor { topic_request_handler: TopicRequestHandler, @@ -53,6 +55,7 @@ pub struct AdminBrokerProcessor { broker_runtime_inner: ArcMut>, notify_min_broker_handler: NotifyMinBrokerChangeIdHandler, + update_broker_ha_handler: UpdateBrokerHaHandler, } impl RequestProcessor for AdminBrokerProcessor @@ -85,6 +88,9 @@ impl AdminBrokerProcessor { let notify_min_broker_handler = NotifyMinBrokerChangeIdHandler::new(broker_runtime_inner.clone()); + + let update_broker_ha_handler = UpdateBrokerHaHandler::new(broker_runtime_inner.clone()); + AdminBrokerProcessor { topic_request_handler, broker_config_request_handler, @@ -94,6 +100,7 @@ impl AdminBrokerProcessor { subscription_group_handler, broker_runtime_inner, notify_min_broker_handler, + update_broker_ha_handler, } } } @@ -225,6 +232,11 @@ impl AdminBrokerProcessor { .notify_min_broker_id_change(channel, ctx, request_code, request) .await } + RequestCode::ExchangeBrokerHaInfo => { + self.update_broker_ha_handler + .update_broker_ha_info(channel, ctx, request_code, request) + .await + } _ => Some(get_unknown_cmd_response(request_code)), } } diff --git a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs index d9b2fe8a..2ad466f2 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs @@ -45,14 +45,14 @@ pub struct NotifyMinBrokerChangeIdHandler { #[derive(Clone)] struct MinBrokerIngroup { min_broker_id_in_group: Option, - min_broker_addr_in_group: CheetahString, + min_broker_addr_in_group: Arc, } impl MinBrokerIngroup { fn new() -> Self { Self { min_broker_id_in_group: Some(MASTER_ID), - min_broker_addr_in_group: CheetahString::empty(), + min_broker_addr_in_group: Arc::new(CheetahString::empty()), } } } @@ -109,16 +109,13 @@ impl NotifyMinBrokerChangeIdHandler { if let Some(min_broker_id) = change_header.min_broker_id { if min_broker_id != self.broker_runtime_inner.get_min_broker_id_in_group() { // on min broker change - let min_broker_id = change_header.min_broker_id.unwrap(); let min_broker_addr = change_header.min_broker_addr.as_deref().unwrap(); - let offline_broker_addr = &change_header.offline_broker_addr; - let master_ha_addr = &change_header.ha_broker_addr; self.on_min_broker_change( min_broker_id, min_broker_addr, - offline_broker_addr, - master_ha_addr, + &change_header.offline_broker_addr, + &change_header.ha_broker_addr, ) .await; } @@ -146,7 +143,7 @@ impl NotifyMinBrokerChangeIdHandler { let mut lock_guard = self.lock.write().await; lock_guard.min_broker_id_in_group = Some(min_broker_id); - lock_guard.min_broker_addr_in_group = CheetahString::from_slice(min_broker_addr); + lock_guard.min_broker_addr_in_group = Arc::new(CheetahString::from_slice(min_broker_addr)); let should_start = self.broker_runtime_inner.get_min_broker_id_in_group() == self.lock.read().await.min_broker_id_in_group.unwrap(); @@ -157,7 +154,7 @@ impl NotifyMinBrokerChangeIdHandler { if let Some(offline_broker_addr) = offline_broker_addr { if let Some(slave_sync) = self.broker_runtime_inner.slave_synchronize() { if let Some(master_addr) = slave_sync.master_addr() { - if offline_broker_addr.eq(master_addr.deref()) { + if !master_addr.is_empty() && offline_broker_addr.eq(master_addr.deref()) { self.on_master_offline().await; } } @@ -227,7 +224,7 @@ impl NotifyMinBrokerChangeIdHandler { self.lock.read().await.min_broker_id_in_group } - pub async fn get_min_broker_addr_in_group(&self) -> CheetahString { + pub async fn get_min_broker_addr_in_group(&self) -> Arc { self.lock.read().await.min_broker_addr_in_group.clone() } } diff --git a/rocketmq-broker/src/processor/admin_broker_processor/update_broker_ha_handler.rs b/rocketmq-broker/src/processor/admin_broker_processor/update_broker_ha_handler.rs new file mode 100644 index 00000000..c10efde5 --- /dev/null +++ b/rocketmq-broker/src/processor/admin_broker_processor/update_broker_ha_handler.rs @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use rocketmq_common::common::mix_all::MASTER_ID; +use rocketmq_remoting::code::request_code::RequestCode; +use rocketmq_remoting::code::response_code::ResponseCode; +use rocketmq_remoting::net::channel::Channel; +use rocketmq_remoting::protocol::header::exchange_ha_info_request_header::ExchangeHAInfoRequestHeader; +use rocketmq_remoting::protocol::header::exchange_ha_info_response_header::ExchangeHaInfoResponseHeader; +use rocketmq_remoting::protocol::remoting_command::RemotingCommand; +use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext; +use rocketmq_rust::ArcMut; +use rocketmq_store::base::message_store::MessageStore; +use tracing::info; + +use crate::broker_runtime::BrokerRuntimeInner; + +pub struct UpdateBrokerHaHandler { + broker_runtime_inner: ArcMut>, +} + +impl UpdateBrokerHaHandler { + pub fn new(broker_runtime_inner: ArcMut>) -> Self { + Self { + broker_runtime_inner, + } + } + + pub async fn update_broker_ha_info( + &mut self, + _channel: Channel, + _ctx: ConnectionHandlerContext, + _request_code: RequestCode, + request: RemotingCommand, + ) -> Option { + let exchange_request_header = request + .decode_command_custom_header::() + .unwrap(); + + let mut response = RemotingCommand::default(); + + if let Some(master_ha_addr) = exchange_request_header.master_ha_address { + if !master_ha_addr.is_empty() { + if let Some(message_store) = self.broker_runtime_inner.message_store() { + message_store + .update_ha_master_address(master_ha_addr.as_str()) + .await; + + let master_address = exchange_request_header.master_address.unwrap_or_default(); + message_store.update_master_address(&master_address); + + let is_sync_master_flush_offset_when_startip = self + .broker_runtime_inner + .message_store_config() + .sync_master_flush_offset_when_startup; + if message_store.get_master_flushed_offset() == 0x000 + && is_sync_master_flush_offset_when_startip + { + let master_flush_offset = + exchange_request_header.master_flush_offset.unwrap(); + + info!( + "Set master flush offset in slave to {}", + master_flush_offset + ); + message_store.set_master_flushed_offset(master_flush_offset); + } + } + } else if self + .broker_runtime_inner + .broker_config() + .broker_identity + .broker_id + == MASTER_ID + { + let response_header = response + .read_custom_header_mut::() + .unwrap(); + + response_header.master_ha_address = + Some(self.broker_runtime_inner.get_ha_server_addr()); + + if let Some(message_store) = self.broker_runtime_inner.message_store() { + response_header.master_flush_offset = + Some(message_store.get_broker_init_max_offset()); + } + response_header.master_address = + Some(self.broker_runtime_inner.get_broker_addr().clone()); + } + } + + response.set_code_ref(ResponseCode::Success); + Some(response) + } +} diff --git a/rocketmq-remoting/src/protocol/header.rs b/rocketmq-remoting/src/protocol/header.rs index 5c965e5c..0d0c8288 100644 --- a/rocketmq-remoting/src/protocol/header.rs +++ b/rocketmq-remoting/src/protocol/header.rs @@ -29,6 +29,7 @@ pub mod elect_master_response_header; pub mod empty_header; pub mod end_transaction_request_header; pub mod exchange_ha_info_request_header; +pub mod exchange_ha_info_response_header; pub mod extra_info_util; pub mod get_all_topic_config_response_header; pub mod get_consume_stats_request_header; diff --git a/rocketmq-remoting/src/protocol/header/exchange_ha_info_response_header.rs b/rocketmq-remoting/src/protocol/header/exchange_ha_info_response_header.rs new file mode 100644 index 00000000..0cfd736e --- /dev/null +++ b/rocketmq-remoting/src/protocol/header/exchange_ha_info_response_header.rs @@ -0,0 +1,52 @@ +use cheetah_string::CheetahString; +use rocketmq_macros::RequestHeaderCodec; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize, RequestHeaderCodec)] +#[serde(rename_all = "camelCase")] +pub struct ExchangeHaInfoResponseHeader { + pub master_ha_address: Option, + pub master_flush_offset: Option, + pub master_address: Option, +} + +#[cfg(test)] +mod tests { + use cheetah_string::CheetahString; + + use super::*; + + fn create_cheetah_string(value: &str) -> Option { + Some(CheetahString::from(value)) + } + + #[test] + fn serialize_with_all_fields_set() { + let header = ExchangeHaInfoResponseHeader { + master_ha_address: create_cheetah_string("127.0.0.1:10911"), + master_flush_offset: Some(1024), + master_address: create_cheetah_string("127.0.0.1"), + }; + + let serialized = serde_json::to_string(&header).unwrap(); + assert!(serialized.contains("\"masterHaAddress\":\"127.0.0.1:10911\"")); + assert!(serialized.contains("\"masterFlushOffset\":1024")); + assert!(serialized.contains("\"masterAddress\":\"127.0.0.1\"")); + } +} From aef0e5c4c789b2cef0400613963d621e3eefb48c Mon Sep 17 00:00:00 2001 From: jon Date: Sat, 13 Sep 2025 02:10:36 +0800 Subject: [PATCH 14/19] Broker id Change function external logic implementation --- .../admin_broker_processor/notify_min_broker_id_handler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs index 2ad466f2..988b5593 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs @@ -180,7 +180,7 @@ impl NotifyMinBrokerChangeIdHandler { if let Some(slave_synchronize) = broker_runtime_inner.slave_synchronize() { if let Some(_master_addr) = slave_synchronize.master_addr() { - // Call the close client method + unimplemented!("Call the close client method") } } From 4f6b8a47f10b10d7806ff9ea7dc652db0639a33c Mon Sep 17 00:00:00 2001 From: jon Date: Sat, 13 Sep 2025 02:19:11 +0800 Subject: [PATCH 15/19] update broker ha info --- .../admin_broker_processor/update_broker_ha_handler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rocketmq-broker/src/processor/admin_broker_processor/update_broker_ha_handler.rs b/rocketmq-broker/src/processor/admin_broker_processor/update_broker_ha_handler.rs index c10efde5..0ce4ceb7 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor/update_broker_ha_handler.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor/update_broker_ha_handler.rs @@ -45,7 +45,7 @@ impl UpdateBrokerHaHandler { _channel: Channel, _ctx: ConnectionHandlerContext, _request_code: RequestCode, - request: RemotingCommand, + request: &mut RemotingCommand, ) -> Option { let exchange_request_header = request .decode_command_custom_header::() From a9853b6eee7aa722bb8701e4a34ebaadbac54e86 Mon Sep 17 00:00:00 2001 From: jon Date: Sat, 13 Sep 2025 02:25:24 +0800 Subject: [PATCH 16/19] update logic --- .../admin_broker_processor/notify_min_broker_id_handler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs index e57502ff..4c077e1c 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs @@ -162,7 +162,7 @@ impl NotifyMinBrokerChangeIdHandler { } //master online - if min_broker_id == MASTER_ID || !min_broker_addr.is_empty() { + if min_broker_id == MASTER_ID && !min_broker_addr.is_empty() { self.on_master_on_line(min_broker_addr, master_ha_addr) .await; } From ff7327f089994eb141af0a9a6bca7abd0e60e972 Mon Sep 17 00:00:00 2001 From: jon Date: Sat, 13 Sep 2025 02:41:55 +0800 Subject: [PATCH 17/19] update logic --- .../admin_broker_processor/update_broker_ha_handler.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rocketmq-broker/src/processor/admin_broker_processor/update_broker_ha_handler.rs b/rocketmq-broker/src/processor/admin_broker_processor/update_broker_ha_handler.rs index 0ce4ceb7..c2c3c2da 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor/update_broker_ha_handler.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor/update_broker_ha_handler.rs @@ -53,7 +53,7 @@ impl UpdateBrokerHaHandler { let mut response = RemotingCommand::default(); - if let Some(master_ha_addr) = exchange_request_header.master_ha_address { + if let Some(master_ha_addr) = exchange_request_header.master_ha_address.as_ref() { if !master_ha_addr.is_empty() { if let Some(message_store) = self.broker_runtime_inner.message_store() { message_store @@ -63,12 +63,12 @@ impl UpdateBrokerHaHandler { let master_address = exchange_request_header.master_address.unwrap_or_default(); message_store.update_master_address(&master_address); - let is_sync_master_flush_offset_when_startip = self + let should_sync_master_flush_offset_on_startup = self .broker_runtime_inner .message_store_config() .sync_master_flush_offset_when_startup; - if message_store.get_master_flushed_offset() == 0x000 - && is_sync_master_flush_offset_when_startip + if message_store.get_master_flushed_offset() == 0x0000 + && should_sync_master_flush_offset_on_startup { let master_flush_offset = exchange_request_header.master_flush_offset.unwrap(); From a53fca788ab6f0f8e673e877fbed6cd0e399ac2f Mon Sep 17 00:00:00 2001 From: watchgou Date: Mon, 15 Sep 2025 21:11:54 +0800 Subject: [PATCH 18/19] reset master flush offset --- .../src/processor/admin_broker_processor.rs | 12 +++ .../reset_master_flusg_offset_handler.rs | 74 +++++++++++++++++++ rocketmq-remoting/src/protocol/header.rs | 1 + .../reset_master_flush_offset_header.rs | 49 ++++++++++++ 4 files changed, 136 insertions(+) create mode 100644 rocketmq-broker/src/processor/admin_broker_processor/reset_master_flusg_offset_handler.rs create mode 100644 rocketmq-remoting/src/protocol/header/reset_master_flush_offset_header.rs diff --git a/rocketmq-broker/src/processor/admin_broker_processor.rs b/rocketmq-broker/src/processor/admin_broker_processor.rs index 64245603..c2da0574 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor.rs @@ -31,6 +31,7 @@ use crate::processor::admin_broker_processor::broker_config_request_handler::Bro use crate::processor::admin_broker_processor::consumer_request_handler::ConsumerRequestHandler; use crate::processor::admin_broker_processor::notify_min_broker_id_handler::NotifyMinBrokerChangeIdHandler; use crate::processor::admin_broker_processor::offset_request_handler::OffsetRequestHandler; +use crate::processor::admin_broker_processor::reset_master_flusg_offset_handler::ResetMasterFlushOffsetHandler; use crate::processor::admin_broker_processor::subscription_group_handler::SubscriptionGroupHandler; use crate::processor::admin_broker_processor::topic_request_handler::TopicRequestHandler; use crate::processor::admin_broker_processor::update_broker_ha_handler::UpdateBrokerHaHandler; @@ -40,6 +41,7 @@ mod broker_config_request_handler; mod consumer_request_handler; mod notify_min_broker_id_handler; mod offset_request_handler; +mod reset_master_flusg_offset_handler; mod subscription_group_handler; mod topic_request_handler; mod update_broker_ha_handler; @@ -56,6 +58,7 @@ pub struct AdminBrokerProcessor { notify_min_broker_handler: NotifyMinBrokerChangeIdHandler, update_broker_ha_handler: UpdateBrokerHaHandler, + reset_master_flusg_offset_handler: ResetMasterFlushOffsetHandler, } impl RequestProcessor for AdminBrokerProcessor @@ -91,6 +94,9 @@ impl AdminBrokerProcessor { let update_broker_ha_handler = UpdateBrokerHaHandler::new(broker_runtime_inner.clone()); + let reset_master_flusg_offset_handler = + ResetMasterFlushOffsetHandler::new(broker_runtime_inner.clone()); + AdminBrokerProcessor { topic_request_handler, broker_config_request_handler, @@ -101,6 +107,7 @@ impl AdminBrokerProcessor { broker_runtime_inner, notify_min_broker_handler, update_broker_ha_handler, + reset_master_flusg_offset_handler, } } } @@ -237,6 +244,11 @@ impl AdminBrokerProcessor { .update_broker_ha_info(channel, ctx, request_code, request) .await } + RequestCode::ResetMasterFlushOffset => { + self.reset_master_flusg_offset_handler + .reset_master_flush_offset(channel, ctx, request_code, request) + .await + } _ => Some(get_unknown_cmd_response(request_code)), } } diff --git a/rocketmq-broker/src/processor/admin_broker_processor/reset_master_flusg_offset_handler.rs b/rocketmq-broker/src/processor/admin_broker_processor/reset_master_flusg_offset_handler.rs new file mode 100644 index 00000000..7cf7f90f --- /dev/null +++ b/rocketmq-broker/src/processor/admin_broker_processor/reset_master_flusg_offset_handler.rs @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use cheetah_string::CheetahString; +use rocketmq_common::common::mix_all::MASTER_ID; +use rocketmq_remoting::code::request_code::RequestCode; +use rocketmq_remoting::code::response_code::ResponseCode; +use rocketmq_remoting::net::channel::Channel; +use rocketmq_remoting::protocol::header::reset_master_flush_offset_header::ResetMasterFlushOffsetHeader; +use rocketmq_remoting::protocol::remoting_command::RemotingCommand; +use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext; +use rocketmq_rust::ArcMut; +use rocketmq_store::base::message_store::MessageStore; + +use crate::broker_runtime::BrokerRuntimeInner; + +#[derive(Clone)] +pub struct ResetMasterFlushOffsetHandler { + broker_runtime_inner: ArcMut>, +} + +impl ResetMasterFlushOffsetHandler { + pub fn new(broker_runtime_inner: ArcMut>) -> Self { + Self { + broker_runtime_inner, + } + } + + pub async fn reset_master_flush_offset( + &mut self, + _channel: Channel, + _ctx: ConnectionHandlerContext, + _request_code: RequestCode, + request: &mut RemotingCommand, + ) -> Option { + let mut response = RemotingCommand::default(); + + if self + .broker_runtime_inner + .broker_config() + .broker_identity + .broker_id + != MASTER_ID + { + let request_header = request + .decode_command_custom_header::() + .unwrap(); + + if let Some(maset_flush_offset) = request_header.master_flush_offset { + if let Some(message_store) = self.broker_runtime_inner.message_store() { + message_store.set_master_flushed_offset(maset_flush_offset); + } + } + } + + response.set_code_ref(ResponseCode::Success); + response.set_remark_mut(CheetahString::empty()); + Some(response) + } +} diff --git a/rocketmq-remoting/src/protocol/header.rs b/rocketmq-remoting/src/protocol/header.rs index 0d0c8288..cbd9b42b 100644 --- a/rocketmq-remoting/src/protocol/header.rs +++ b/rocketmq-remoting/src/protocol/header.rs @@ -66,6 +66,7 @@ pub mod query_subscription_by_consumer_request_header; pub mod query_topic_consume_by_who_request_header; pub mod query_topics_by_consumer_request_header; pub mod reply_message_request_header; +pub mod reset_master_flush_offset_header; pub mod reset_offset_request_header; pub mod search_offset_response_header; pub mod unlock_batch_mq_request_header; diff --git a/rocketmq-remoting/src/protocol/header/reset_master_flush_offset_header.rs b/rocketmq-remoting/src/protocol/header/reset_master_flush_offset_header.rs new file mode 100644 index 00000000..168fb309 --- /dev/null +++ b/rocketmq-remoting/src/protocol/header/reset_master_flush_offset_header.rs @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use rocketmq_macros::RequestHeaderCodec; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Clone, Debug, Serialize, Deserialize, RequestHeaderCodec)] +#[serde(rename_all = "camelCase")] +pub struct ResetMasterFlushOffsetHeader { + pub master_flush_offset: Option, +} + +#[cfg(test)] +mod tests { + use crate::protocol::header::reset_master_flush_offset_header::ResetMasterFlushOffsetHeader; + + #[test] + fn reset_master_flush_offset_header_serializes_correctly() { + let header = ResetMasterFlushOffsetHeader { + master_flush_offset: Some(4231), + }; + + let serialized = serde_json::to_string(&header).unwrap(); + let expected = r#"{"masterFlushOffset":4231}"#; + assert_eq!(serialized, expected); + } + + #[test] + fn reset_master_flush_offset_header_deserializes_correctly() { + let data = r#"{"masterFlushOffset":9527}"#; + let header: ResetMasterFlushOffsetHeader = serde_json::from_str(data).unwrap(); + assert_eq!(header.master_flush_offset, Some(9527)); + } +} From eff1404d18f2cf85baab79ebc72307330c8b7321 Mon Sep 17 00:00:00 2001 From: watchgou Date: Tue, 16 Sep 2025 18:10:25 +0800 Subject: [PATCH 19/19] ffi::c uchar, which does not adapt to macos system, it is recommended to change to c char --- rocketmq-store/src/utils/ffi.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rocketmq-store/src/utils/ffi.rs b/rocketmq-store/src/utils/ffi.rs index 8b5b8f41..6c98bf63 100644 --- a/rocketmq-store/src/utils/ffi.rs +++ b/rocketmq-store/src/utils/ffi.rs @@ -93,9 +93,9 @@ pub fn mincore(addr: *const u8, len: usize, vec: *const u8) -> i32 { { use std::ffi::c_void; - use libc::c_uchar; + use libc::c_char; - unsafe { libc::mincore(addr as *mut c_void, len, vec as *mut c_uchar) } + unsafe { libc::mincore(addr as *mut c_void, len, vec as *mut c_char) } } #[cfg(windows)] {