-
Notifications
You must be signed in to change notification settings - Fork 174
[ISSUE #4050]♻️Replace manual heartbeat scheduling with a fixed-rate task manager #4051
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughReplaced ad-hoc spawned loops for periodic broker-member-group sync and heartbeat with fixed-rate tasks scheduled via ScheduledTaskManager; added an isolation guard to skip heartbeats when Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant BR as BrokerRuntime
participant STM as ScheduledTaskManager
participant IS as BrokerRuntimeInner
participant NS as NameServer
rect rgb(235,245,255)
note over BR,STM: Startup schedules fixed-rate tasks
BR->>STM: add_fixed_rate_task_async(sync_broker_member_group, period)
BR->>STM: add_fixed_rate_task_async(heartbeat, period)
end
rect rgb(245,255,235)
note right of STM: On each sync tick
STM->>IS: IS.sync_broker_member_group() (async)
end
rect rgb(255,245,235)
note right of STM: On each heartbeat tick
STM->>IS: check IS.is_isolated
alt isolated
STM-->>IS: skip heartbeat (log)
else not isolated
STM->>IS: IS.send_heartbeat() (async)
IS->>NS: Heartbeat RPC
NS-->>IS: Ack/Result
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (3 warnings)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Refactors the broker runtime to replace manual task scheduling with a centralized task manager for heartbeat operations. This change simplifies the code by eliminating custom timing logic and leverages a standardized approach for scheduling recurring tasks.
- Replaced manual tokio spawn with loop-based timing logic with
scheduled_task_manager.add_fixed_rate_task_async
- Added isolation check to skip heartbeat sending when broker is isolated
- Simplified sync_broker_member_group scheduling implementation
Comments suppressed due to low confidence (1)
rocketmq-broker/src/broker_runtime.rs:1
- The original code calls
sync_broker_member_group()
synchronously, but the new implementation calls it with.await
. This changes the execution model from blocking to async, which could affect timing and error handling behavior.
/*
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
self.inner.broker_config.sync_broker_member_group_period; | ||
let inner_ = self.inner.clone(); | ||
self.scheduled_task_manager.add_fixed_rate_task_async( | ||
Duration::from_millis(1000), |
Copilot
AI
Sep 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hardcoded 1000ms initial delay should use a named constant or configuration value for better maintainability, similar to how sync_broker_member_group_period
is used.
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
rocketmq-broker/src/broker_runtime.rs (1)
1257-1260
: Isolation guard LGTM; avoid per-tick log spam.Logging at info! every heartbeat can flood logs when isolated. Consider demoting to debug!/trace! or log only on state change.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
rocketmq-broker/src/broker_runtime.rs
(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
rocketmq-broker/src/broker_runtime.rs (1)
rocketmq-common/src/common/broker/broker_config.rs (1)
sync_broker_member_group_period
(311-313)
🪛 GitHub Actions: CI
rocketmq-broker/src/broker_runtime.rs
[error] 1200-1200: Command 'cargo clippy -- -D warnings' failed with Rust error E0277: '()' is not a future. The code awaits inner_.sync_broker_member_group() at rocketmq-broker/src/broker_runtime.rs:1200:55. Remove '.await' or ensure the function returns a Future.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: test
- GitHub Check: auto-approve
let sync_broker_member_group_period = | ||
self.inner.broker_config.sync_broker_member_group_period; | ||
let inner_ = self.inner.clone(); | ||
self.scheduled_task_manager.add_fixed_rate_task_async( | ||
Duration::from_millis(1000), | ||
Duration::from_millis(sync_broker_member_group_period), | ||
async move |_ctx| { | ||
inner_.sync_broker_member_group().await; | ||
Ok(()) | ||
}, | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix build: don’t await sync_broker_member_group() (E0277).
sync_broker_member_group is synchronous (returns ()), so awaiting it triggers E0277. Call it directly.
- inner_.sync_broker_member_group().await;
+ inner_.sync_broker_member_group();
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let sync_broker_member_group_period = | |
self.inner.broker_config.sync_broker_member_group_period; | |
let inner_ = self.inner.clone(); | |
self.scheduled_task_manager.add_fixed_rate_task_async( | |
Duration::from_millis(1000), | |
Duration::from_millis(sync_broker_member_group_period), | |
async move |_ctx| { | |
inner_.sync_broker_member_group().await; | |
Ok(()) | |
}, | |
); | |
let sync_broker_member_group_period = | |
self.inner.broker_config.sync_broker_member_group_period; | |
let inner_ = self.inner.clone(); | |
self.scheduled_task_manager.add_fixed_rate_task_async( | |
Duration::from_millis(1000), | |
Duration::from_millis(sync_broker_member_group_period), | |
async move |_ctx| { | |
inner_.sync_broker_member_group(); | |
Ok(()) | |
}, | |
); |
🧰 Tools
🪛 GitHub Actions: CI
[error] 1200-1200: Command 'cargo clippy -- -D warnings' failed with Rust error E0277: '()' is not a future. The code awaits inner_.sync_broker_member_group() at rocketmq-broker/src/broker_runtime.rs:1200:55. Remove '.await' or ensure the function returns a Future.
🤖 Prompt for AI Agents
In rocketmq-broker/src/broker_runtime.rs around lines 1193 to 1203, the closure
currently calls inner_.sync_broker_member_group().await but
sync_broker_member_group is synchronous (returns ()), causing E0277; remove the
.await and call inner_.sync_broker_member_group() directly inside the async move
closure (i.e., invoke the method without awaiting), keeping the closure
signature and Ok(()) unchanged.
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4051 +/- ##
=======================================
Coverage 26.51% 26.52%
=======================================
Files 570 570
Lines 80926 80922 -4
=======================================
Hits 21461 21461
+ Misses 59465 59461 -4 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
rocketmq-broker/src/broker_runtime.rs (1)
1193-1203
: Await is correct here; replace magic 1000ms with a named constant and guard against zero/too‑small period.
- The previous E0277 bot comment is obsolete:
sync_broker_member_group
is now async;.await
is required.- Please avoid the hardcoded 1000ms initial delay and validate the period to prevent a zero/overly small cadence. Also consider reusing the same named constant in
schedule_send_heartbeat()
for consistency.Apply this diff:
- self.scheduled_task_manager.add_fixed_rate_task_async( - Duration::from_millis(1000), - Duration::from_millis(sync_broker_member_group_period), + self.scheduled_task_manager.add_fixed_rate_task_async( + Duration::from_millis(SYNC_MEMBER_GROUP_INITIAL_DELAY_MS), + Duration::from_millis(sync_broker_member_group_period.max(1_000)),Add this module-level constant (or place near related scheduling code):
const SYNC_MEMBER_GROUP_INITIAL_DELAY_MS: u64 = 1_000;
🧹 Nitpick comments (2)
rocketmq-broker/src/broker_runtime.rs (2)
1257-1260
: Reduce heartbeat skip log noise (use debug or sample).When isolated, this will log every tick. Downgrade to debug or throttle.
- if inner_.is_isolated.load(Ordering::Acquire) { - info!("Skip send heartbeat for broker is isolated"); + if inner_.is_isolated.load(Ordering::Acquire) { + tracing::debug!("Skip send heartbeat: broker is isolated"); return Ok(()); }
2469-2471
:sync_broker_member_group()
is async but a no‑op; avoid warn spam until implemented.The scheduled task will call this frequently; emitting
warn!
each time is noisy. Use debug and leave a TODO. Optionally wire it toBrokerOuterAPI::sync_broker_member_group
when ready.- async fn sync_broker_member_group(&self) { - warn!("sync_broker_member_group not implemented"); + async fn sync_broker_member_group(&self) { + // TODO: implement: query NameSrv and update broker_member_group. + tracing::debug!("sync_broker_member_group: not implemented"); }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
rocketmq-broker/src/broker_runtime.rs
(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
rocketmq-broker/src/broker_runtime.rs (1)
rocketmq-broker/src/out_api/broker_outer_api.rs (1)
sync_broker_member_group
(750-757)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: build (windows-latest, stable)
- GitHub Check: build (windows-latest, nightly)
- GitHub Check: build (macos-latest, stable)
- GitHub Check: build (ubuntu-latest, stable)
- GitHub Check: build (ubuntu-latest, nightly)
- GitHub Check: build (macos-latest, nightly)
- GitHub Check: test
- GitHub Check: build
- GitHub Check: auto-approve
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Which Issue(s) This PR Fixes(Closes)
Fixes #4050
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
Bug Fixes
Refactor