Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 16 additions & 28 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1190,33 +1190,17 @@ impl BrokerRuntime {

if self.inner.broker_config.enable_slave_acting_master {
self.schedule_send_heartbeat();
let broker_runtime_inner = self.inner.clone();
self.broker_runtime
.as_ref()
.unwrap()
.get_handle()
.spawn(async move {
let period = Duration::from_secs(1);
let initial_delay = Duration::from_millis(
broker_runtime_inner
.broker_config
.sync_broker_member_group_period,
);
tokio::time::sleep(initial_delay).await;
loop {
// record current execution time
let current_execution_time = tokio::time::Instant::now();
// execute task
broker_runtime_inner.sync_broker_member_group();
// Calculate the time of the next execution
let next_execution_time = current_execution_time + period;

// Wait until the next execution
let delay = next_execution_time
.saturating_duration_since(tokio::time::Instant::now());
tokio::time::sleep(delay).await;
}
});
let sync_broker_member_group_period =
self.inner.broker_config.sync_broker_member_group_period;
let inner_ = self.inner.clone();
self.scheduled_task_manager.add_fixed_rate_task_async(
Duration::from_millis(1000),
Copy link

Copilot AI Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hardcoded 1000ms initial delay should use a named constant or configuration value for better maintainability, similar to how sync_broker_member_group_period is used.

Copilot uses AI. Check for mistakes.

Duration::from_millis(sync_broker_member_group_period),
async move |_ctx| {
inner_.sync_broker_member_group().await;
Ok(())
},
);
Comment on lines +1193 to +1203
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix build: don’t await sync_broker_member_group() (E0277).

sync_broker_member_group is synchronous (returns ()), so awaiting it triggers E0277. Call it directly.

-                    inner_.sync_broker_member_group().await;
+                    inner_.sync_broker_member_group();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let sync_broker_member_group_period =
self.inner.broker_config.sync_broker_member_group_period;
let inner_ = self.inner.clone();
self.scheduled_task_manager.add_fixed_rate_task_async(
Duration::from_millis(1000),
Duration::from_millis(sync_broker_member_group_period),
async move |_ctx| {
inner_.sync_broker_member_group().await;
Ok(())
},
);
let sync_broker_member_group_period =
self.inner.broker_config.sync_broker_member_group_period;
let inner_ = self.inner.clone();
self.scheduled_task_manager.add_fixed_rate_task_async(
Duration::from_millis(1000),
Duration::from_millis(sync_broker_member_group_period),
async move |_ctx| {
inner_.sync_broker_member_group();
Ok(())
},
);
🧰 Tools
🪛 GitHub Actions: CI

[error] 1200-1200: Command 'cargo clippy -- -D warnings' failed with Rust error E0277: '()' is not a future. The code awaits inner_.sync_broker_member_group() at rocketmq-broker/src/broker_runtime.rs:1200:55. Remove '.await' or ensure the function returns a Future.

🤖 Prompt for AI Agents
In rocketmq-broker/src/broker_runtime.rs around lines 1193 to 1203, the closure
currently calls inner_.sync_broker_member_group().await but
sync_broker_member_group is synchronous (returns ()), causing E0277; remove the
.await and call inner_.sync_broker_member_group() directly inside the async move
closure (i.e., invoke the method without awaiting), keeping the closure
signature and Ok(()) unchanged.

}

if self.inner.broker_config.enable_controller_mode {
Expand Down Expand Up @@ -1270,6 +1254,10 @@ impl BrokerRuntime {
Duration::from_millis(1000),
Duration::from_millis(broker_heartbeat_interval),
async move |_ctx| {
if inner_.is_isolated.load(Ordering::Acquire) {
info!("Skip send heartbeat for broker is isolated");
return Ok(());
}
inner_.send_heartbeat().await;
Ok(())
},
Expand Down Expand Up @@ -2478,7 +2466,7 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
CheetahString::from_string(addr)
}

pub fn sync_broker_member_group(&self) {
async fn sync_broker_member_group(&self) {
warn!("sync_broker_member_group not implemented");
}

Expand Down
Loading