-
Notifications
You must be signed in to change notification settings - Fork 174
[ISSUE #4050]♻️Replace manual heartbeat scheduling with a fixed-rate task manager #4051
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -1190,33 +1190,17 @@ impl BrokerRuntime { | |||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
if self.inner.broker_config.enable_slave_acting_master { | ||||||||||||||||||||||||||||||||||||||||||||||
self.schedule_send_heartbeat(); | ||||||||||||||||||||||||||||||||||||||||||||||
let broker_runtime_inner = self.inner.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||
self.broker_runtime | ||||||||||||||||||||||||||||||||||||||||||||||
.as_ref() | ||||||||||||||||||||||||||||||||||||||||||||||
.unwrap() | ||||||||||||||||||||||||||||||||||||||||||||||
.get_handle() | ||||||||||||||||||||||||||||||||||||||||||||||
.spawn(async move { | ||||||||||||||||||||||||||||||||||||||||||||||
let period = Duration::from_secs(1); | ||||||||||||||||||||||||||||||||||||||||||||||
let initial_delay = Duration::from_millis( | ||||||||||||||||||||||||||||||||||||||||||||||
broker_runtime_inner | ||||||||||||||||||||||||||||||||||||||||||||||
.broker_config | ||||||||||||||||||||||||||||||||||||||||||||||
.sync_broker_member_group_period, | ||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||
tokio::time::sleep(initial_delay).await; | ||||||||||||||||||||||||||||||||||||||||||||||
loop { | ||||||||||||||||||||||||||||||||||||||||||||||
// record current execution time | ||||||||||||||||||||||||||||||||||||||||||||||
let current_execution_time = tokio::time::Instant::now(); | ||||||||||||||||||||||||||||||||||||||||||||||
// execute task | ||||||||||||||||||||||||||||||||||||||||||||||
broker_runtime_inner.sync_broker_member_group(); | ||||||||||||||||||||||||||||||||||||||||||||||
// Calculate the time of the next execution | ||||||||||||||||||||||||||||||||||||||||||||||
let next_execution_time = current_execution_time + period; | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
// Wait until the next execution | ||||||||||||||||||||||||||||||||||||||||||||||
let delay = next_execution_time | ||||||||||||||||||||||||||||||||||||||||||||||
.saturating_duration_since(tokio::time::Instant::now()); | ||||||||||||||||||||||||||||||||||||||||||||||
tokio::time::sleep(delay).await; | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||||
let sync_broker_member_group_period = | ||||||||||||||||||||||||||||||||||||||||||||||
self.inner.broker_config.sync_broker_member_group_period; | ||||||||||||||||||||||||||||||||||||||||||||||
let inner_ = self.inner.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||
self.scheduled_task_manager.add_fixed_rate_task_async( | ||||||||||||||||||||||||||||||||||||||||||||||
Duration::from_millis(1000), | ||||||||||||||||||||||||||||||||||||||||||||||
Duration::from_millis(sync_broker_member_group_period), | ||||||||||||||||||||||||||||||||||||||||||||||
async move |_ctx| { | ||||||||||||||||||||||||||||||||||||||||||||||
inner_.sync_broker_member_group().await; | ||||||||||||||||||||||||||||||||||||||||||||||
Ok(()) | ||||||||||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+1193
to
+1203
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix build: don’t await sync_broker_member_group() (E0277). sync_broker_member_group is synchronous (returns ()), so awaiting it triggers E0277. Call it directly. - inner_.sync_broker_member_group().await;
+ inner_.sync_broker_member_group(); 📝 Committable suggestion
Suggested change
🧰 Tools🪛 GitHub Actions: CI[error] 1200-1200: Command 'cargo clippy -- -D warnings' failed with Rust error E0277: '()' is not a future. The code awaits inner_.sync_broker_member_group() at rocketmq-broker/src/broker_runtime.rs:1200:55. Remove '.await' or ensure the function returns a Future. 🤖 Prompt for AI Agents
|
||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
if self.inner.broker_config.enable_controller_mode { | ||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -1270,6 +1254,10 @@ impl BrokerRuntime { | |||||||||||||||||||||||||||||||||||||||||||||
Duration::from_millis(1000), | ||||||||||||||||||||||||||||||||||||||||||||||
Duration::from_millis(broker_heartbeat_interval), | ||||||||||||||||||||||||||||||||||||||||||||||
async move |_ctx| { | ||||||||||||||||||||||||||||||||||||||||||||||
if inner_.is_isolated.load(Ordering::Acquire) { | ||||||||||||||||||||||||||||||||||||||||||||||
info!("Skip send heartbeat for broker is isolated"); | ||||||||||||||||||||||||||||||||||||||||||||||
return Ok(()); | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
inner_.send_heartbeat().await; | ||||||||||||||||||||||||||||||||||||||||||||||
Ok(()) | ||||||||||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -2478,7 +2466,7 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> { | |||||||||||||||||||||||||||||||||||||||||||||
CheetahString::from_string(addr) | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
pub fn sync_broker_member_group(&self) { | ||||||||||||||||||||||||||||||||||||||||||||||
async fn sync_broker_member_group(&self) { | ||||||||||||||||||||||||||||||||||||||||||||||
warn!("sync_broker_member_group not implemented"); | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hardcoded 1000ms initial delay should use a named constant or configuration value for better maintainability, similar to how
sync_broker_member_group_period
is used.Copilot uses AI. Check for mistakes.