Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2057,6 +2057,18 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
&self.pop_inflight_message_counter
}

#[inline]
pub fn slave_synchronize(&self) -> &Option<SlaveSynchronize<MS>> {
&self.slave_synchronize
}

#[inline]
pub fn update_slave_master_addr(&mut self, master_addr: CheetahString) {
if let Some(ref mut slave) = self.slave_synchronize {
slave.set_master_addr(master_addr);
};
}

#[inline]
pub fn set_store_host(&mut self, store_host: SocketAddr) {
self.store_host = store_host;
Expand Down
12 changes: 12 additions & 0 deletions rocketmq-broker/src/processor/admin_broker_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::processor::admin_broker_processor::notify_min_broker_id_handler::Noti
use crate::processor::admin_broker_processor::offset_request_handler::OffsetRequestHandler;
use crate::processor::admin_broker_processor::subscription_group_handler::SubscriptionGroupHandler;
use crate::processor::admin_broker_processor::topic_request_handler::TopicRequestHandler;
use crate::processor::admin_broker_processor::update_broker_ha_handler::UpdateBrokerHaHandler;

mod batch_mq_handler;
mod broker_config_request_handler;
Expand All @@ -41,6 +42,7 @@ mod notify_min_broker_id_handler;
mod offset_request_handler;
mod subscription_group_handler;
mod topic_request_handler;
mod update_broker_ha_handler;

pub struct AdminBrokerProcessor<MS: MessageStore> {
topic_request_handler: TopicRequestHandler<MS>,
Expand All @@ -53,6 +55,7 @@ pub struct AdminBrokerProcessor<MS: MessageStore> {
broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>,

notify_min_broker_handler: NotifyMinBrokerChangeIdHandler<MS>,
update_broker_ha_handler: UpdateBrokerHaHandler<MS>,
}

impl<MS> RequestProcessor for AdminBrokerProcessor<MS>
Expand Down Expand Up @@ -85,6 +88,9 @@ impl<MS: MessageStore> AdminBrokerProcessor<MS> {

let notify_min_broker_handler =
NotifyMinBrokerChangeIdHandler::new(broker_runtime_inner.clone());

let update_broker_ha_handler = UpdateBrokerHaHandler::new(broker_runtime_inner.clone());

AdminBrokerProcessor {
topic_request_handler,
broker_config_request_handler,
Expand All @@ -94,6 +100,7 @@ impl<MS: MessageStore> AdminBrokerProcessor<MS> {
subscription_group_handler,
broker_runtime_inner,
notify_min_broker_handler,
update_broker_ha_handler,
}
}
}
Expand Down Expand Up @@ -225,6 +232,11 @@ impl<MS: MessageStore> AdminBrokerProcessor<MS> {
.notify_min_broker_id_change(channel, ctx, request_code, request)
.await
}
RequestCode::ExchangeBrokerHaInfo => {
self.update_broker_ha_handler
.update_broker_ha_info(channel, ctx, request_code, request)
.await
}
_ => Some(get_unknown_cmd_response(request_code)),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,53 @@
* limitations under the License.
*/

use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;

use cheetah_string::CheetahString;
use rocketmq_common::common::mix_all::MASTER_ID;
use rocketmq_remoting::code::request_code::RequestCode;
use rocketmq_remoting::code::response_code::ResponseCode;
use rocketmq_remoting::net::channel::Channel;
use rocketmq_remoting::protocol::header::namesrv::brokerid_change_request_header::NotifyMinBrokerIdChangeRequestHeader;
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
use rocketmq_rust::ArcMut;
use rocketmq_rust::RocketMQTokioRwLock;
use rocketmq_store::base::message_store::MessageStore;
use tracing::error;
use tracing::info;
use tracing::warn;

use crate::broker_runtime::BrokerRuntimeInner;

#[derive(Clone)]
pub struct NotifyMinBrokerChangeIdHandler<MS: MessageStore> {
broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>,
lock: Arc<RocketMQTokioRwLock<MinBrokerIngroup>>,
}

#[derive(Clone)]
struct MinBrokerIngroup {
min_broker_id_in_group: Option<u64>,
min_broker_addr_in_group: Arc<CheetahString>,
}

impl MinBrokerIngroup {
fn new() -> Self {
Self {
min_broker_id_in_group: Some(MASTER_ID),
min_broker_addr_in_group: Arc::new(CheetahString::empty()),
}
}
}

impl<MS: MessageStore> NotifyMinBrokerChangeIdHandler<MS> {
pub fn new(broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>) -> Self {
Self {
broker_runtime_inner,
lock: Arc::new(RocketMQTokioRwLock::new(MinBrokerIngroup::new())),
}
}

Expand All @@ -50,24 +76,155 @@ impl<MS: MessageStore> NotifyMinBrokerChangeIdHandler<MS> {
.decode_command_custom_header::<NotifyMinBrokerIdChangeRequestHeader>()
.unwrap();

let current_broker_id = self
.broker_runtime_inner
.broker_config()
.broker_identity
.broker_id;
let broker_config = self.broker_runtime_inner.broker_config();

let latest_broker_id = change_header
.min_broker_id
.expect("min broker id not must be present");

warn!(
"min broker id changed, prev {}, new {}",
current_broker_id,
change_header
.min_broker_id
.expect("min broker id not must be present")
broker_config.broker_identity.broker_id, latest_broker_id
);

// TODO Implement update broker id method in the near future
self.update_min_broker(change_header).await;

let mut response = RemotingCommand::default();
response.set_code_ref(ResponseCode::Success);
Some(response)
}

async fn update_min_broker(&mut self, change_header: NotifyMinBrokerIdChangeRequestHeader) {
let broker_config = self.broker_runtime_inner.broker_config();

if broker_config.enable_slave_acting_master
&& broker_config.broker_identity.broker_id != MASTER_ID
{
if self
.lock
.try_write_timeout(Duration::from_millis(3000))
.await
.is_some()
{
if let Some(min_broker_id) = change_header.min_broker_id {
if min_broker_id != self.broker_runtime_inner.get_min_broker_id_in_group() {
// on min broker change
let min_broker_addr = change_header.min_broker_addr.as_deref().unwrap();

Comment on lines +112 to +113
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Don't unwrap min_broker_addr

min_broker_addr may be absent; unwrap() panics. Use a safe default or branch.

Covered in the larger diff above by using as_deref().unwrap_or_default().

🤖 Prompt for AI Agents
In
rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs
around lines 112 to 113, the code calls
change_header.min_broker_addr.as_deref().unwrap() which can panic if
min_broker_addr is None; replace this unwrap with a safe fallback or branch —
for example use as_deref().unwrap_or_default() to get an empty string when
absent or explicitly match/if-let on min_broker_addr to handle the None case and
proceed accordingly, ensuring no panics.

self.on_min_broker_change(
min_broker_id,
min_broker_addr,
&change_header.offline_broker_addr,
&change_header.ha_broker_addr,
)
.await;
}
}
} else {
error!("Update min broker failed");
}
}
Comment on lines +103 to +126
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Fix lock misuse: ignoring write-guard and read-under-write causes race/deadlock

  • The result of try_write_timeout(...) is discarded; you immediately reacquire a fresh write-lock later. This is TOCTOU and defeats the timeout.
  • In on_min_broker_change you hold a write lock and then await a read lock from the same RwLock (Lines 148-150) — this can deadlock.

Refactor to (a) compare against the tracked in-group value under a read lock, (b) avoid the unused try_write_timeout, and (c) compute should_start without re-locking:

@@
-            if self
-                .lock
-                .try_write_timeout(Duration::from_millis(3000))
-                .await
-                .is_some()
-            {
-                if let Some(min_broker_id) = change_header.min_broker_id {
-                    if min_broker_id != self.broker_runtime_inner.get_min_broker_id_in_group() {
-                        // on min broker change
-                        let min_broker_addr = change_header.min_broker_addr.as_deref().unwrap();
-
-                        self.on_min_broker_change(
-                            min_broker_id,
-                            min_broker_addr,
-                            &change_header.offline_broker_addr,
-                            &change_header.ha_broker_addr,
-                        )
-                        .await;
-                    }
-                }
-            } else {
-                error!("Update min broker failed");
-            }
+            if let Some(min_broker_id) = change_header.min_broker_id {
+                let current = self.lock.read().await.min_broker_id_in_group;
+                if current != Some(min_broker_id) {
+                    let min_broker_addr = change_header
+                        .min_broker_addr
+                        .as_deref()
+                        .unwrap_or_default();
+                    self.on_min_broker_change(
+                        min_broker_id,
+                        min_broker_addr,
+                        &change_header.offline_broker_addr,
+                        &change_header.ha_broker_addr,
+                    )
+                    .await;
+                }
+            } else {
+                warn!("min_broker_id missing; skip update_min_broker");
+            }
@@
-        info!(
-            "Min broker changed, old: {}-{}, new {}-{}",
-            self.broker_runtime_inner.get_min_broker_id_in_group(),
-            self.broker_runtime_inner.get_broker_addr(),
-            min_broker_id,
-            min_broker_addr
-        );
-
-        let mut lock_guard = self.lock.write().await;
-        lock_guard.min_broker_id_in_group = Some(min_broker_id);
-        lock_guard.min_broker_addr_in_group = Arc::new(CheetahString::from_slice(min_broker_addr));
-
-        let should_start = self.broker_runtime_inner.get_min_broker_id_in_group()
-            == self.lock.read().await.min_broker_id_in_group.unwrap();
+        let mut guard = self.lock.write().await;
+        let prev_id = guard.min_broker_id_in_group;
+        let prev_addr = guard.min_broker_addr_in_group.clone();
+        info!(
+            "Min broker changed, old: {:?}-{}, new {}-{}",
+            prev_id, prev_addr, min_broker_id, min_broker_addr
+        );
+        guard.min_broker_id_in_group = Some(min_broker_id);
+        guard.min_broker_addr_in_group = Arc::new(CheetahString::from_slice(min_broker_addr));
+        drop(guard);
+
+        let should_start =
+            self.broker_runtime_inner.get_min_broker_id_in_group() == min_broker_id;

Also applies to: 136-151

}

async fn on_min_broker_change(
&self,
min_broker_id: u64,
min_broker_addr: &str,
offline_broker_addr: &Option<CheetahString>,
master_ha_addr: &Option<CheetahString>,
) {
info!(
"Min broker changed, old: {}-{}, new {}-{}",
self.broker_runtime_inner.get_min_broker_id_in_group(),
self.broker_runtime_inner.get_broker_addr(),
min_broker_id,
min_broker_addr
);

let mut lock_guard = self.lock.write().await;
lock_guard.min_broker_id_in_group = Some(min_broker_id);
lock_guard.min_broker_addr_in_group = Arc::new(CheetahString::from_slice(min_broker_addr));

let should_start = self.broker_runtime_inner.get_min_broker_id_in_group()
== self.lock.read().await.min_broker_id_in_group.unwrap();

self.change_special_service_status(should_start).await;

// master offline
if let Some(offline_broker_addr) = offline_broker_addr {
if let Some(slave_sync) = self.broker_runtime_inner.slave_synchronize() {
if let Some(master_addr) = slave_sync.master_addr() {
if !master_addr.is_empty() && offline_broker_addr.eq(master_addr.deref()) {
self.on_master_offline().await;
}
}
}
}

//master online
if min_broker_id == MASTER_ID || !min_broker_addr.is_empty() {
self.on_master_on_line(min_broker_addr, master_ha_addr)
.await;
}
}

async fn change_special_service_status(&self, should_start: bool) {
self.broker_runtime_inner
.mut_from_ref()
.change_special_service_status(should_start)
.await;
}

async fn on_master_offline(&self) {
let broker_runtime_inner = self.broker_runtime_inner.mut_from_ref();

if let Some(slave_synchronize) = broker_runtime_inner.slave_synchronize() {
if let Some(_master_addr) = slave_synchronize.master_addr() {
// Call the close client method
}
}

broker_runtime_inner.update_slave_master_addr(CheetahString::empty());
if let Some(message_store) = broker_runtime_inner.message_store() {
message_store.update_master_address(&CheetahString::empty());
}
}

async fn on_master_on_line(
&self,
_min_broker_addr: &str,
master_ha_addr: &Option<CheetahString>,
) {
let need_sync_master_flush_offset =
if let Some(message_store) = self.broker_runtime_inner.message_store() {
message_store.get_master_flushed_offset() == 0x0000
&& self
.broker_runtime_inner
.message_store_config()
.sync_master_flush_offset_when_startup
} else {
false
};

if master_ha_addr.is_none() || need_sync_master_flush_offset {
if need_sync_master_flush_offset {
unimplemented!();
}

if master_ha_addr.is_none() {
let broker_runtime_inner = self.broker_runtime_inner.mut_from_ref();
if let Some(_message_store) = broker_runtime_inner.message_store() {
unimplemented!("");
}
}
}
}
Comment on lines +198 to +221
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Replace unimplemented!() with non-fatal behavior

unimplemented!() will panic the broker at runtime. Log and return until the implementation is ready.

-        if master_ha_addr.is_none() || need_sync_master_flush_offset {
-            if need_sync_master_flush_offset {
-                unimplemented!();
-            }
-
-            if master_ha_addr.is_none() {
-                let broker_runtime_inner = self.broker_runtime_inner.mut_from_ref();
-                if let Some(_message_store) = broker_runtime_inner.message_store() {
-                    unimplemented!("");
-                }
-            }
-        }
+        if master_ha_addr.is_none() || need_sync_master_flush_offset {
+            if need_sync_master_flush_offset {
+                warn!("Syncing master flush offset is not implemented yet");
+            }
+            if master_ha_addr.is_none() {
+                let broker_runtime_inner = self.broker_runtime_inner.mut_from_ref();
+                if let Some(_message_store) = broker_runtime_inner.message_store() {
+                    warn!("Updating message store with master HA addr is not implemented yet");
+                }
+            }
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let need_sync_master_flush_offset =
if let Some(message_store) = self.broker_runtime_inner.message_store() {
message_store.get_master_flushed_offset() == 0x0000
&& self
.broker_runtime_inner
.message_store_config()
.sync_master_flush_offset_when_startup
} else {
false
};
if master_ha_addr.is_none() || need_sync_master_flush_offset {
if need_sync_master_flush_offset {
unimplemented!();
}
if master_ha_addr.is_none() {
let broker_runtime_inner = self.broker_runtime_inner.mut_from_ref();
if let Some(_message_store) = broker_runtime_inner.message_store() {
unimplemented!("");
}
}
}
}
let need_sync_master_flush_offset =
if let Some(message_store) = self.broker_runtime_inner.message_store() {
message_store.get_master_flushed_offset() == 0x0000
&& self
.broker_runtime_inner
.message_store_config()
.sync_master_flush_offset_when_startup
} else {
false
};
if master_ha_addr.is_none() || need_sync_master_flush_offset {
if need_sync_master_flush_offset {
warn!("Syncing master flush offset is not implemented yet");
}
if master_ha_addr.is_none() {
let broker_runtime_inner = self.broker_runtime_inner.mut_from_ref();
if let Some(_message_store) = broker_runtime_inner.message_store() {
warn!("Updating message store with master HA addr is not implemented yet");
}
}
}
}
🤖 Prompt for AI Agents
In
rocketmq-broker/src/processor/admin_broker_processor/notify_min_broker_id_handler.rs
around lines 198 to 221, replace the unimplemented!() calls with non-fatal
behavior: log a warning or info including context (e.g., why master flush sync
or message_store branch is not implemented) and then return early from the
handler (or otherwise exit this code path) instead of panicking; ensure you use
the existing logger instance available on self or broker_runtime_inner, include
relevant variables in the message (like need_sync_master_flush_offset or whether
message_store is present), and avoid changing control flow beyond
returning/short-circuiting so the broker continues running.


pub async fn get_min_broker_id_in_group(&self) -> Option<u64> {
self.lock.read().await.min_broker_id_in_group
}

pub async fn get_min_broker_addr_in_group(&self) -> Arc<CheetahString> {
self.lock.read().await.min_broker_addr_in_group.clone()
}
}
Loading