Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1455,7 +1455,7 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
}
}
if let Some(slave_synchronize) = &mut self.slave_synchronize {
slave_synchronize.set_master_addr(Some(result.master_addr));
slave_synchronize.set_master_addr(Some(&result.master_addr));
}
if check_order_config {
if let Some(topic_config_manager) = &mut self.topic_config_manager {
Expand Down Expand Up @@ -2063,10 +2063,25 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
self.slave_synchronize.as_ref()
}

#[inline]
pub fn slave_synchronize_unchecked(&self) -> &SlaveSynchronize<MS> {
unsafe { self.slave_synchronize.as_ref().unwrap_unchecked() }
}
Comment on lines +2066 to +2069
Copy link

Copilot AI Sep 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These unchecked accessors rely on invariants about self.slave_synchronize being Some but provide no Safety docs specifying the required preconditions. Please add doc-comments with a Safety section detailing the invariant and when these may be called, or keep them private and prefer the checked variants at call sites.

Copilot uses AI. Check for mistakes.


#[inline]
pub fn slave_synchronize_mut(&mut self) -> Option<&mut SlaveSynchronize<MS>> {
self.slave_synchronize.as_mut()
}

#[inline]
pub fn slave_synchronize_mut_unchecked(&mut self) -> &mut SlaveSynchronize<MS> {
unsafe { self.slave_synchronize.as_mut().unwrap_unchecked() }
}
Comment on lines +2076 to +2079
Copy link

Copilot AI Sep 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These unchecked accessors rely on invariants about self.slave_synchronize being Some but provide no Safety docs specifying the required preconditions. Please add doc-comments with a Safety section detailing the invariant and when these may be called, or keep them private and prefer the checked variants at call sites.

Copilot uses AI. Check for mistakes.


#[inline]
pub fn update_slave_master_addr(&mut self, master_addr: Option<CheetahString>) {
if let Some(ref mut slave) = self.slave_synchronize {
slave.set_master_addr(master_addr);
slave.set_master_addr(master_addr.as_ref());
};
Comment on lines 2082 to 2085
Copy link

Copilot AI Sep 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This clones master_addr inside set_master_addr even though an owned Option is already available, causing an avoidable allocation. Either (a) change set_master_addr to take Option by value and pass master_addr directly, or (b) change update_slave_master_addr to accept Option<&CheetahString> to avoid constructing an owned value.

Copilot uses AI. Check for mistakes.

}

Expand Down Expand Up @@ -2702,7 +2717,16 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
}

async fn on_master_offline(&mut self) {
error!("unimplemented")
let slave_synchronize = self.slave_synchronize_unchecked();
if let Some(master_addr) = slave_synchronize.master_addr() {
if !master_addr.is_empty() {
//close channels
}
}
self.slave_synchronize_mut_unchecked().set_master_addr(None);
Comment on lines +2720 to +2726
Copy link

Copilot AI Sep 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using unwrap_unchecked via slave_synchronize_unchecked()/slave_synchronize_mut_unchecked() here is UB if self.slave_synchronize is None. Replace with a safe path that gracefully no-ops when absent.

Suggested change
let slave_synchronize = self.slave_synchronize_unchecked();
if let Some(master_addr) = slave_synchronize.master_addr() {
if !master_addr.is_empty() {
//close channels
}
}
self.slave_synchronize_mut_unchecked().set_master_addr(None);
if let Some(slave_synchronize) = self.slave_synchronize() {
if let Some(master_addr) = slave_synchronize.master_addr() {
if !master_addr.is_empty() {
//close channels
}
}
}
if let Some(slave_synchronize_mut) = self.slave_synchronize_mut() {
slave_synchronize_mut.set_master_addr(None);
}

Copilot uses AI. Check for mistakes.

self.message_store_unchecked_mut()
.update_ha_master_address("")
.await
}

async fn send_heartbeat(&self) {
Expand Down
4 changes: 2 additions & 2 deletions rocketmq-broker/src/slave/slave_synchronize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ where
self.master_addr.as_ref()
}

pub fn set_master_addr(&mut self, addr: Option<impl Into<CheetahString>>) {
let addr = addr.map(|addr| addr.into());
pub fn set_master_addr(&mut self, addr: Option<&CheetahString>) {
let addr = addr.cloned();
if self.master_addr == addr {
return;
Comment on lines +50 to 53
Copy link

Copilot AI Sep 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Changing the signature from Option<impl Into> to Option<&CheetahString> reduces ergonomics and forces an internal clone, losing the zero-cost move path for owned values. Consider restoring the owned-taking API (e.g., pub fn set_master_addr(&mut self, addr: Option)) and, if needed, adding a separate convenience for borrowed inputs that clones explicitly.

Copilot uses AI. Check for mistakes.

}
Expand Down
Loading