-
Notifications
You must be signed in to change notification settings - Fork 175
[ISSUE #4075]🚀Implement Broker on_master_offline #4076
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -1455,7 +1455,7 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> { | |||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
if let Some(slave_synchronize) = &mut self.slave_synchronize { | ||||||||||||||||||||||||||||||||||||
slave_synchronize.set_master_addr(Some(result.master_addr)); | ||||||||||||||||||||||||||||||||||||
slave_synchronize.set_master_addr(Some(&result.master_addr)); | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
if check_order_config { | ||||||||||||||||||||||||||||||||||||
if let Some(topic_config_manager) = &mut self.topic_config_manager { | ||||||||||||||||||||||||||||||||||||
|
@@ -2063,10 +2063,25 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> { | |||||||||||||||||||||||||||||||||||
self.slave_synchronize.as_ref() | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
#[inline] | ||||||||||||||||||||||||||||||||||||
pub fn slave_synchronize_unchecked(&self) -> &SlaveSynchronize<MS> { | ||||||||||||||||||||||||||||||||||||
unsafe { self.slave_synchronize.as_ref().unwrap_unchecked() } | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
#[inline] | ||||||||||||||||||||||||||||||||||||
pub fn slave_synchronize_mut(&mut self) -> Option<&mut SlaveSynchronize<MS>> { | ||||||||||||||||||||||||||||||||||||
self.slave_synchronize.as_mut() | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
#[inline] | ||||||||||||||||||||||||||||||||||||
pub fn slave_synchronize_mut_unchecked(&mut self) -> &mut SlaveSynchronize<MS> { | ||||||||||||||||||||||||||||||||||||
unsafe { self.slave_synchronize.as_mut().unwrap_unchecked() } | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
Comment on lines
+2076
to
+2079
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These unchecked accessors rely on invariants about self.slave_synchronize being Some but provide no Safety docs specifying the required preconditions. Please add doc-comments with a Safety section detailing the invariant and when these may be called, or keep them private and prefer the checked variants at call sites. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
#[inline] | ||||||||||||||||||||||||||||||||||||
pub fn update_slave_master_addr(&mut self, master_addr: Option<CheetahString>) { | ||||||||||||||||||||||||||||||||||||
if let Some(ref mut slave) = self.slave_synchronize { | ||||||||||||||||||||||||||||||||||||
slave.set_master_addr(master_addr); | ||||||||||||||||||||||||||||||||||||
slave.set_master_addr(master_addr.as_ref()); | ||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||
Comment on lines
2082
to
2085
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This clones master_addr inside set_master_addr even though an owned Option is already available, causing an avoidable allocation. Either (a) change set_master_addr to take Option by value and pass master_addr directly, or (b) change update_slave_master_addr to accept Option<&CheetahString> to avoid constructing an owned value. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
|
@@ -2702,7 +2717,16 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> { | |||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
async fn on_master_offline(&mut self) { | ||||||||||||||||||||||||||||||||||||
error!("unimplemented") | ||||||||||||||||||||||||||||||||||||
let slave_synchronize = self.slave_synchronize_unchecked(); | ||||||||||||||||||||||||||||||||||||
if let Some(master_addr) = slave_synchronize.master_addr() { | ||||||||||||||||||||||||||||||||||||
if !master_addr.is_empty() { | ||||||||||||||||||||||||||||||||||||
//close channels | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
self.slave_synchronize_mut_unchecked().set_master_addr(None); | ||||||||||||||||||||||||||||||||||||
Comment on lines
+2720
to
+2726
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using unwrap_unchecked via slave_synchronize_unchecked()/slave_synchronize_mut_unchecked() here is UB if self.slave_synchronize is None. Replace with a safe path that gracefully no-ops when absent.
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||||||||||||||||||||||||||||||||
self.message_store_unchecked_mut() | ||||||||||||||||||||||||||||||||||||
.update_ha_master_address("") | ||||||||||||||||||||||||||||||||||||
.await | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
async fn send_heartbeat(&self) { | ||||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,8 +47,8 @@ where | |
self.master_addr.as_ref() | ||
} | ||
|
||
pub fn set_master_addr(&mut self, addr: Option<impl Into<CheetahString>>) { | ||
let addr = addr.map(|addr| addr.into()); | ||
pub fn set_master_addr(&mut self, addr: Option<&CheetahString>) { | ||
let addr = addr.cloned(); | ||
if self.master_addr == addr { | ||
return; | ||
Comment on lines
+50
to
53
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nitpick] Changing the signature from Option<impl Into> to Option<&CheetahString> reduces ergonomics and forces an internal clone, losing the zero-cost move path for owned values. Consider restoring the owned-taking API (e.g., pub fn set_master_addr(&mut self, addr: Option)) and, if needed, adding a separate convenience for borrowed inputs that clones explicitly. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These unchecked accessors rely on invariants about self.slave_synchronize being Some but provide no Safety docs specifying the required preconditions. Please add doc-comments with a Safety section detailing the invariant and when these may be called, or keep them private and prefer the checked variants at call sites.
Copilot uses AI. Check for mistakes.