Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 49 additions & 10 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2542,16 +2542,15 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
let min_broker_addr = broker_member_group
.broker_addrs
.get(&min_broker_id)
.cloned()
.unwrap_or_default();
.cloned();
BrokerRuntimeInner::update_min_broker(this, min_broker_id, min_broker_addr).await;
}
}

pub async fn update_min_broker(
this: &ArcMut<Self>,
min_broker_id: u64,
min_broker_addr: CheetahString,
min_broker_addr: Option<CheetahString>,
) {
if this.broker_config.enable_slave_acting_master
&& this.broker_config.broker_identity.broker_id != MASTER_ID
Expand Down Expand Up @@ -2678,22 +2677,22 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
async fn on_min_broker_change(
&mut self,
min_broker_id: u64,
min_broker_addr: CheetahString,
min_broker_addr: Option<CheetahString>,
offline_broker_addr: Option<CheetahString>,
master_ha_addr: Option<CheetahString>,
) {
let min_broker_id_in_group_old = self.min_broker_id_in_group.load(Ordering::SeqCst);
let mut min_broker_addr_in_group_old = self.min_broker_addr_in_group.lock().await;
info!(
"Min broker changed, old: {}-{:?}, new {}-{}",
"Min broker changed, old: {}-{:?}, new {}-{:?}",
min_broker_id_in_group_old,
min_broker_addr_in_group_old,
min_broker_id,
min_broker_addr
);
self.min_broker_id_in_group
.store(min_broker_id, Ordering::SeqCst);
*min_broker_addr_in_group_old = Some(min_broker_addr.clone());
*min_broker_addr_in_group_old = min_broker_addr.clone();
drop(min_broker_addr_in_group_old);
self.change_special_service_status(
self.broker_config.broker_identity.broker_id
Expand All @@ -2707,7 +2706,7 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
self.on_master_offline().await;
}

if min_broker_id == MASTER_ID && !min_broker_addr.is_empty() {
if min_broker_id == MASTER_ID && min_broker_addr.is_some() {
//master online
self.on_master_on_line(min_broker_addr, master_ha_addr)
.await;
Expand All @@ -2723,10 +2722,50 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {

async fn on_master_on_line(
&mut self,
_min_broker_addr: CheetahString,
_master_ha_addr: Option<CheetahString>,
min_broker_addr: Option<CheetahString>,
master_ha_addr: Option<CheetahString>,
) {
error!("unimplemented")
let need_sync_master_flush_offset =
self.message_store_unchecked().get_master_flushed_offset() == 0
&& self.message_store_config.all_ack_in_sync_state_set;
if master_ha_addr.is_none() || need_sync_master_flush_offset {
match self
.broker_outer_api
.retrieve_broker_ha_info(min_broker_addr.as_ref())
.await
{
Ok(broker_sync_info) => {
if need_sync_master_flush_offset {
Comment on lines +2728 to +2738
Copy link

Copilot AI Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The variable name need_sync_master_flush_offset is verbose and could be shortened to should_sync_flush_offset for better readability while maintaining clarity.

Suggested change
let need_sync_master_flush_offset =
self.message_store_unchecked().get_master_flushed_offset() == 0
&& self.message_store_config.all_ack_in_sync_state_set;
if master_ha_addr.is_none() || need_sync_master_flush_offset {
match self
.broker_outer_api
.retrieve_broker_ha_info(min_broker_addr.as_ref())
.await
{
Ok(broker_sync_info) => {
if need_sync_master_flush_offset {
let should_sync_flush_offset =
self.message_store_unchecked().get_master_flushed_offset() == 0
&& self.message_store_config.all_ack_in_sync_state_set;
if master_ha_addr.is_none() || should_sync_flush_offset {
match self
.broker_outer_api
.retrieve_broker_ha_info(min_broker_addr.as_ref())
.await
{
Ok(broker_sync_info) => {
if should_sync_flush_offset {

Copilot uses AI. Check for mistakes.

info!(
"Set master flush offset in slave to {}",
broker_sync_info.master_flush_offset,
);
self.message_store_unchecked()
.set_master_flushed_offset(broker_sync_info.master_flush_offset);
}
if master_ha_addr.is_none() {
let message_store = self.message_store_unchecked();
if let Some(master_ha_address) = &broker_sync_info.master_ha_address {
message_store
.update_ha_master_address(master_ha_address.as_str())
.await;
}
if let Some(master_address) = &broker_sync_info.master_address {
message_store.update_master_address(master_address);
}
}
}
Err(e) => {
error!("retrieve master ha info exception, {}", e);
}
}
}
if let Some(master_ha_addr_) = master_ha_addr {
self.message_store_unchecked_mut()
.update_ha_master_address(master_ha_addr_.as_str())
.await;
}
self.message_store_unchecked().wakeup_ha_client();
}

async fn on_master_offline(&mut self) {
Expand Down
32 changes: 32 additions & 0 deletions rocketmq-broker/src/out_api/broker_outer_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ use rocketmq_remoting::protocol::body::message_request_mode_serialize_wrapper::M
use rocketmq_remoting::protocol::body::response::lock_batch_response_body::LockBatchResponseBody;
use rocketmq_remoting::protocol::body::subscription_group_wrapper::SubscriptionGroupWrapper;
use rocketmq_remoting::protocol::body::topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper;
use rocketmq_remoting::protocol::broker_sync_info::BrokerSyncInfo;
use rocketmq_remoting::protocol::header::client_request_header::GetRouteInfoRequestHeader;
use rocketmq_remoting::protocol::header::exchange_ha_info_request_header::ExchangeHAInfoRequestHeader;
use rocketmq_remoting::protocol::header::exchange_ha_info_response_header::ExchangeHaInfoResponseHeader;
use rocketmq_remoting::protocol::header::lock_batch_mq_request_header::LockBatchMqRequestHeader;
use rocketmq_remoting::protocol::header::message_operation_header::send_message_request_header::SendMessageRequestHeader;
use rocketmq_remoting::protocol::header::message_operation_header::send_message_request_header_v2::SendMessageRequestHeaderV2;
Expand Down Expand Up @@ -786,6 +788,36 @@ impl BrokerOuterAPI {
))
}
}

pub async fn retrieve_broker_ha_info(
&self,
master_broker_addr: Option<&CheetahString>,
) -> rocketmq_error::RocketMQResult<BrokerSyncInfo> {
let request_header = ExchangeHAInfoRequestHeader::default();
let request = RemotingCommand::create_request_command(
RequestCode::ExchangeBrokerHaInfo,
request_header,
);
let response = self
.remoting_client
.invoke_async(master_broker_addr, request, 3000)
.await?;

if ResponseCode::from(response.code()) == ResponseCode::Success {
let header = response.decode_command_custom_header::<ExchangeHaInfoResponseHeader>()?;
return Ok(BrokerSyncInfo {
master_address: header.master_address,
master_ha_address: header.master_ha_address,
master_flush_offset: header.master_flush_offset.unwrap_or(0),
});
}

Err(RocketmqError::MQBrokerError(
response.code(),
response.remark().map_or("".to_string(), |s| s.to_string()),
master_broker_addr.map_or("".to_string(), |s| s.to_string()),
))
}
}

fn process_pull_result(
Expand Down
Loading