Skip to content

Conversation

mxsm
Copy link
Owner

@mxsm mxsm commented Sep 16, 2025

Which Issue(s) This PR Fixes(Closes)

Fixes #4050

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • Bug Fixes

    • Heartbeats are now skipped when the broker is isolated, reducing unnecessary network traffic and log noise.
  • Refactor

    • Periodic background tasks (member-sync and heartbeats) consolidated into a centralized scheduler for more consistent timing and improved stability.
    • Member-group sync now runs at a controlled interval based on the configured sync period.

@Copilot Copilot AI review requested due to automatic review settings September 16, 2025 06:44
Copy link
Contributor

coderabbitai bot commented Sep 16, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Replaced ad-hoc spawned loops for periodic broker-member-group sync and heartbeat with fixed-rate tasks scheduled via ScheduledTaskManager; added an isolation guard to skip heartbeats when is_isolated is true; converted sync_broker_member_group from a synchronous pub fn to an async fn (non-public), and adapted closures to use cloned inner state.

Changes

Cohort / File(s) Summary
Scheduler refactor, heartbeat guard, and method signature change
rocketmq-broker/src/broker_runtime.rs
Replaced manual spawn/loop scheduling with ScheduledTaskManager::add_fixed_rate_task_async (1s initial delay, configured periods) for both broker-member-group sync and heartbeat; introduced sync_broker_member_group_period variable and cloned inner_ for task closures; added early-return when is_isolated to skip heartbeat and log; changed pub fn sync_broker_member_group(&self)async fn sync_broker_member_group(&self) (visibility reduced).

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant BR as BrokerRuntime
  participant STM as ScheduledTaskManager
  participant IS as BrokerRuntimeInner
  participant NS as NameServer

  rect rgb(235,245,255)
    note over BR,STM: Startup schedules fixed-rate tasks
    BR->>STM: add_fixed_rate_task_async(sync_broker_member_group, period)
    BR->>STM: add_fixed_rate_task_async(heartbeat, period)
  end

  rect rgb(245,255,235)
    note right of STM: On each sync tick
    STM->>IS: IS.sync_broker_member_group() (async)
  end

  rect rgb(255,245,235)
    note right of STM: On each heartbeat tick
    STM->>IS: check IS.is_isolated
    alt isolated
      STM-->>IS: skip heartbeat (log)
    else not isolated
      STM->>IS: IS.send_heartbeat() (async)
      IS->>NS: Heartbeat RPC
      NS-->>IS: Ack/Result
    end
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

I thump a rhythm, schedules neat,
Fixed-rate hops keep perfect beat.
If burrows close and silence grows,
I pause my pulse until it flows.
A rabbit's nod to tidy time. 🐇

Pre-merge checks and finishing touches

❌ Failed checks (3 warnings)
Check name Status Explanation Resolution
Linked Issues Check ⚠️ Warning The changes implement the scheduling refactor targeted by issue #4050 by moving heartbeat and sync tasks to a ScheduledTaskManager and adding an isolation guard, but the PR omits updated unit tests and lacks a descriptive PR body or documentation; additionally it changes a previously public API (sync_broker_member_group → async private) without note, so the checklist items "update unit tests" and "ensure the refactor does not introduce new bugs" cannot be confirmed. Because required coding-related checklist items are not satisfied or documented, the PR does not fully comply with the linked issue's objectives. The absence of tests and API-change disclosure prevents a reliable verification that the refactor meets the issue requirements. Add or update unit tests that cover heartbeat scheduling, the ScheduledTaskManager behavior, and the sync_broker_member_group path; expand the PR description to document design choices and any public API changes or migrations; and run basic performance checks or include benchmarks and CI test results before merging.
Out of Scope Changes Check ⚠️ Warning The linked issue explicitly targets replacing manual heartbeat scheduling, but the PR also refactors the broker member-group sync into the ScheduledTaskManager and alters the visibility and signature of sync_broker_member_group, which are broader changes than the stated objective and affect the public API surface. These signature/visibility changes and the additional task refactor are out-of-scope relative to the minimal heartbeat-focused intent and should be called out. Without explicit mention in the issue or PR, these additional changes risk unexpected breakage for downstream users. Split unrelated changes (e.g., moving broker-member sync scheduling and any API signature/visibility changes) into a separate PR or clearly document and justify them here, restore or provide compatibility wrappers for any removed public methods, and add tests and migration notes to avoid surprising consumers.
Docstring Coverage ⚠️ Warning Docstring coverage is 25.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The PR title directly and specifically describes the primary change—replacing manual heartbeat scheduling with a fixed-rate task manager—which matches the code changes that centralize scheduling via ScheduledTaskManager. It is concise and understandable to reviewers scanning history. It does include an issue tag and an emoji which are minor noise but do not make it misleading.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch refactor-4050

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@rocketmq-rust-bot
Copy link
Collaborator

🔊@mxsm 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Refactors the broker runtime to replace manual task scheduling with a centralized task manager for heartbeat operations. This change simplifies the code by eliminating custom timing logic and leverages a standardized approach for scheduling recurring tasks.

  • Replaced manual tokio spawn with loop-based timing logic with scheduled_task_manager.add_fixed_rate_task_async
  • Added isolation check to skip heartbeat sending when broker is isolated
  • Simplified sync_broker_member_group scheduling implementation
Comments suppressed due to low confidence (1)

rocketmq-broker/src/broker_runtime.rs:1

  • The original code calls sync_broker_member_group() synchronously, but the new implementation calls it with .await. This changes the execution model from blocking to async, which could affect timing and error handling behavior.
/*

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

self.inner.broker_config.sync_broker_member_group_period;
let inner_ = self.inner.clone();
self.scheduled_task_manager.add_fixed_rate_task_async(
Duration::from_millis(1000),
Copy link

Copilot AI Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hardcoded 1000ms initial delay should use a named constant or configuration value for better maintainability, similar to how sync_broker_member_group_period is used.

Copilot uses AI. Check for mistakes.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
rocketmq-broker/src/broker_runtime.rs (1)

1257-1260: Isolation guard LGTM; avoid per-tick log spam.

Logging at info! every heartbeat can flood logs when isolated. Consider demoting to debug!/trace! or log only on state change.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 29f2e3e and 53dfa0f.

📒 Files selected for processing (1)
  • rocketmq-broker/src/broker_runtime.rs (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
rocketmq-broker/src/broker_runtime.rs (1)
rocketmq-common/src/common/broker/broker_config.rs (1)
  • sync_broker_member_group_period (311-313)
🪛 GitHub Actions: CI
rocketmq-broker/src/broker_runtime.rs

[error] 1200-1200: Command 'cargo clippy -- -D warnings' failed with Rust error E0277: '()' is not a future. The code awaits inner_.sync_broker_member_group() at rocketmq-broker/src/broker_runtime.rs:1200:55. Remove '.await' or ensure the function returns a Future.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: test
  • GitHub Check: auto-approve

Comment on lines +1193 to +1203
let sync_broker_member_group_period =
self.inner.broker_config.sync_broker_member_group_period;
let inner_ = self.inner.clone();
self.scheduled_task_manager.add_fixed_rate_task_async(
Duration::from_millis(1000),
Duration::from_millis(sync_broker_member_group_period),
async move |_ctx| {
inner_.sync_broker_member_group().await;
Ok(())
},
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix build: don’t await sync_broker_member_group() (E0277).

sync_broker_member_group is synchronous (returns ()), so awaiting it triggers E0277. Call it directly.

-                    inner_.sync_broker_member_group().await;
+                    inner_.sync_broker_member_group();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let sync_broker_member_group_period =
self.inner.broker_config.sync_broker_member_group_period;
let inner_ = self.inner.clone();
self.scheduled_task_manager.add_fixed_rate_task_async(
Duration::from_millis(1000),
Duration::from_millis(sync_broker_member_group_period),
async move |_ctx| {
inner_.sync_broker_member_group().await;
Ok(())
},
);
let sync_broker_member_group_period =
self.inner.broker_config.sync_broker_member_group_period;
let inner_ = self.inner.clone();
self.scheduled_task_manager.add_fixed_rate_task_async(
Duration::from_millis(1000),
Duration::from_millis(sync_broker_member_group_period),
async move |_ctx| {
inner_.sync_broker_member_group();
Ok(())
},
);
🧰 Tools
🪛 GitHub Actions: CI

[error] 1200-1200: Command 'cargo clippy -- -D warnings' failed with Rust error E0277: '()' is not a future. The code awaits inner_.sync_broker_member_group() at rocketmq-broker/src/broker_runtime.rs:1200:55. Remove '.await' or ensure the function returns a Future.

🤖 Prompt for AI Agents
In rocketmq-broker/src/broker_runtime.rs around lines 1193 to 1203, the closure
currently calls inner_.sync_broker_member_group().await but
sync_broker_member_group is synchronous (returns ()), causing E0277; remove the
.await and call inner_.sync_broker_member_group() directly inside the async move
closure (i.e., invoke the method without awaiting), keeping the closure
signature and Ok(()) unchanged.

@rocketmq-rust-bot
Copy link
Collaborator

🔊@mxsm 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

Copy link

codecov bot commented Sep 16, 2025

Codecov Report

❌ Patch coverage is 0% with 15 lines in your changes missing coverage. Please review.
✅ Project coverage is 26.52%. Comparing base (29f2e3e) to head (562080a).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
rocketmq-broker/src/broker_runtime.rs 0.00% 15 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff           @@
##             main    #4051   +/-   ##
=======================================
  Coverage   26.51%   26.52%           
=======================================
  Files         570      570           
  Lines       80926    80922    -4     
=======================================
  Hits        21461    21461           
+ Misses      59465    59461    -4     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
rocketmq-broker/src/broker_runtime.rs (1)

1193-1203: Await is correct here; replace magic 1000ms with a named constant and guard against zero/too‑small period.

  • The previous E0277 bot comment is obsolete: sync_broker_member_group is now async; .await is required.
  • Please avoid the hardcoded 1000ms initial delay and validate the period to prevent a zero/overly small cadence. Also consider reusing the same named constant in schedule_send_heartbeat() for consistency.

Apply this diff:

-            self.scheduled_task_manager.add_fixed_rate_task_async(
-                Duration::from_millis(1000),
-                Duration::from_millis(sync_broker_member_group_period),
+            self.scheduled_task_manager.add_fixed_rate_task_async(
+                Duration::from_millis(SYNC_MEMBER_GROUP_INITIAL_DELAY_MS),
+                Duration::from_millis(sync_broker_member_group_period.max(1_000)),

Add this module-level constant (or place near related scheduling code):

const SYNC_MEMBER_GROUP_INITIAL_DELAY_MS: u64 = 1_000;
🧹 Nitpick comments (2)
rocketmq-broker/src/broker_runtime.rs (2)

1257-1260: Reduce heartbeat skip log noise (use debug or sample).

When isolated, this will log every tick. Downgrade to debug or throttle.

-                if inner_.is_isolated.load(Ordering::Acquire) {
-                    info!("Skip send heartbeat for broker is isolated");
+                if inner_.is_isolated.load(Ordering::Acquire) {
+                    tracing::debug!("Skip send heartbeat: broker is isolated");
                     return Ok(());
                 }

2469-2471: sync_broker_member_group() is async but a no‑op; avoid warn spam until implemented.

The scheduled task will call this frequently; emitting warn! each time is noisy. Use debug and leave a TODO. Optionally wire it to BrokerOuterAPI::sync_broker_member_group when ready.

-    async fn sync_broker_member_group(&self) {
-        warn!("sync_broker_member_group not implemented");
+    async fn sync_broker_member_group(&self) {
+        // TODO: implement: query NameSrv and update broker_member_group.
+        tracing::debug!("sync_broker_member_group: not implemented");
     }
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 53dfa0f and 562080a.

📒 Files selected for processing (1)
  • rocketmq-broker/src/broker_runtime.rs (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
rocketmq-broker/src/broker_runtime.rs (1)
rocketmq-broker/src/out_api/broker_outer_api.rs (1)
  • sync_broker_member_group (750-757)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: build (windows-latest, stable)
  • GitHub Check: build (windows-latest, nightly)
  • GitHub Check: build (macos-latest, stable)
  • GitHub Check: build (ubuntu-latest, stable)
  • GitHub Check: build (ubuntu-latest, nightly)
  • GitHub Check: build (macos-latest, nightly)
  • GitHub Check: test
  • GitHub Check: build
  • GitHub Check: auto-approve

Copy link
Collaborator

@rocketmq-rust-bot rocketmq-rust-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@rocketmq-rust-bot rocketmq-rust-bot merged commit 22fa414 into main Sep 16, 2025
18 of 19 checks passed
@rocketmq-rust-bot rocketmq-rust-bot added approved PR has approved and removed ready to review waiting-review waiting review this PR labels Sep 16, 2025
@mxsm mxsm deleted the refactor-4050 branch September 19, 2025 16:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AI review first Ai review pr first approved PR has approved auto merge refactor♻️ refactor code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Refactor♻️]Replace manual heartbeat scheduling with a fixed-rate task manager

3 participants