Skip to content

Conversation

mxsm
Copy link
Owner

@mxsm mxsm commented Sep 16, 2025

Which Issue(s) This PR Fixes(Closes)

Fixes #4048

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features
    • Introduced periodic broker heartbeat scheduling in slave-acting-master and controller modes, enabling automated liveness updates without manual intervention on supported deployments.
    • Added a new broker configuration option to set the heartbeat interval (default: 1000 ms), allowing operators to fine-tune frequency based on deployment needs. The setting is applied on startup when the above modes are enabled.

@Copilot Copilot AI review requested due to automatic review settings September 16, 2025 03:49
Copy link
Contributor

coderabbitai bot commented Sep 16, 2025

Walkthrough

Implements periodic heartbeat scheduling in broker runtime, triggered on startup when specific modes are enabled. Adds a new broker_heartbeat_interval configuration (default 1000 ms) and wires it into scheduling. Heartbeat execution stub logs an error placeholder.

Changes

Cohort / File(s) Summary
Heartbeat scheduling in runtime
rocketmq-broker/src/broker_runtime.rs
Implemented schedule_send_heartbeat to run after 1s and then every broker_heartbeat_interval; added BrokerRuntimeInner::send_heartbeat (stub logging error); invoked scheduling during start() when enable_slave_acting_master or enable_controller_mode are true; imported tracing::error.
Config: heartbeat interval
rocketmq-common/src/common/broker/broker_config.rs
Added defaults::broker_heartbeat_interval() -> u64 = 1000; introduced BrokerConfig.broker_heartbeat_interval with serde default; updated Default impl to set 1000.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Admin as Operator/Config
  participant Start as BrokerRuntime::start()
  participant Runtime as BrokerRuntime
  participant Scheduler as Tokio Task Scheduler
  participant Inner as BrokerRuntimeInner

  Admin->>Start: Start broker
  Start->>Runtime: Check modes (slave acting master / controller)
  alt modes enabled
    Runtime->>Scheduler: schedule_send_heartbeat(initial=1s, period=broker_heartbeat_interval)
    loop every interval
      Scheduler->>Inner: send_heartbeat()
      note right of Inner: Current implementation logs "unimplemented"
    end
  else modes disabled
    Start-->>Admin: Continue without heartbeat scheduling
  end
Loading

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~10 minutes

Poem

I twitch my ears to timed beats, tick-tick—go!
A heartbeat drum beneath the broker’s flow.
New knobs set pace, 1000 marks the trail,
Tasks hop in loops, they never fail—
For now they thump a “TODO” rhyme,
Soon they’ll pulse in perfect time. 🐇⏱️

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 11.11% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The PR title accurately summarizes the primary change—adding a broker heartbeat interval configuration and scheduling—which matches the edits in broker_config.rs and broker_runtime.rs; the issue tag and emoji are minor noise but do not prevent clarity.
Linked Issues Check ✅ Passed The changes fulfill the linked issue #4048 by adding broker_heartbeat_interval (with default) and wiring a scheduled heartbeat task into broker startup, but the send_heartbeat implementation is currently a stub that logs "unimplemented" and there are no tests or documentation shown, so configuration and scheduling are implemented while runtime behavior remains incomplete.
Out of Scope Changes Check ✅ Passed All modifications are confined to broker_runtime.rs and broker_config.rs (and a tracing import) and directly relate to adding heartbeat configuration and scheduling; no unrelated files or API signature changes are evident from the provided summary, so there are no apparent out-of-scope changes.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature-4048

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Please see the documentation for more information.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@rocketmq-rust-robot rocketmq-rust-robot added the feature🚀 Suggest an idea for this project. label Sep 16, 2025
@rocketmq-rust-bot
Copy link
Collaborator

🔊@mxsm 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds broker heartbeat interval configuration functionality to the RocketMQ Rust implementation. It introduces a configurable heartbeat interval setting and implements the scheduling mechanism for heartbeat sending.

  • Adds broker_heartbeat_interval configuration parameter with 1000ms default value
  • Implements heartbeat scheduling using a fixed-rate task
  • Adds placeholder implementation for the send_heartbeat method

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
rocketmq-common/src/common/broker/broker_config.rs Adds broker_heartbeat_interval configuration field with default value and serialization support
rocketmq-broker/src/broker_runtime.rs Implements heartbeat scheduling logic and adds placeholder send_heartbeat method

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines +1270 to +1271
Duration::from_millis(1000),
Duration::from_millis(broker_heartbeat_interval),
Copy link

Copilot AI Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hardcoded initial delay of 1000ms should use the same configurable interval as the periodic interval. Consider using Duration::from_millis(broker_heartbeat_interval) for both parameters to maintain consistency.

Suggested change
Duration::from_millis(1000),
Duration::from_millis(broker_heartbeat_interval),
Duration::from_millis(broker_heartbeat_interval),
Duration::from_millis(broker_heartbeat_interval),

Copilot uses AI. Check for mistakes.

}

async fn send_heartbeat(&self) {
error!("unimplemented")
Copy link

Copilot AI Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message should be more descriptive and follow consistent formatting. Consider using 'BrokerRuntimeInner::send_heartbeat is not implemented' to match the pattern used in other unimplemented methods in this file.

Suggested change
error!("unimplemented")
unimplemented!("BrokerRuntimeInner::send_heartbeat is not implemented")

Copilot uses AI. Check for mistakes.

Copy link

codecov bot commented Sep 16, 2025

Codecov Report

❌ Patch coverage is 0% with 17 lines in your changes missing coverage. Please review.
✅ Project coverage is 26.51%. Comparing base (2c32b33) to head (38fce6f).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
rocketmq-broker/src/broker_runtime.rs 0.00% 14 Missing ⚠️
rocketmq-common/src/common/broker/broker_config.rs 0.00% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #4049      +/-   ##
==========================================
- Coverage   26.52%   26.51%   -0.01%     
==========================================
  Files         570      570              
  Lines       80910    80926      +16     
==========================================
  Hits        21461    21461              
- Misses      59449    59465      +16     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Collaborator

@rocketmq-rust-bot rocketmq-rust-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@rocketmq-rust-bot rocketmq-rust-bot merged commit 29f2e3e into main Sep 16, 2025
20 of 23 checks passed
@rocketmq-rust-bot rocketmq-rust-bot added approved PR has approved and removed ready to review waiting-review waiting review this PR labels Sep 16, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
rocketmq-common/src/common/broker/broker_config.rs (1)

770-772: Expose heartbeat interval in properties for observability.

Other BrokerConfig fields are surfaced via get_properties(); consider adding "brokerHeartbeatInterval" for parity and easier introspection.

rocketmq-broker/src/broker_runtime.rs (2)

2592-2594: Downgrade log level to avoid periodic error spam until implemented.

This runs every tick; error! will flood logs. Use warn!/debug!/trace! instead.

-    async fn send_heartbeat(&self) {
-        error!("unimplemented")
-    }
+    async fn send_heartbeat(&self) {
+        warn!("send_heartbeat not implemented");
+    }

Note: If you adopt this, remove the now-unused use tracing::error;.


58-58: Import is only used by the stub.

If you switch to warn!/debug!, drop this import.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2c32b33 and 38fce6f.

📒 Files selected for processing (2)
  • rocketmq-broker/src/broker_runtime.rs (3 hunks)
  • rocketmq-common/src/common/broker/broker_config.rs (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
rocketmq-broker/src/broker_runtime.rs (3)
rocketmq-common/src/common/broker/broker_config.rs (1)
  • broker_heartbeat_interval (371-373)
rocketmq-store/src/ha/default_ha_connection.rs (1)
  • send_heartbeat (709-725)
rocketmq-client/src/implementation/mq_client_api_impl.rs (1)
  • send_heartbeat (1094-1122)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: build
  • GitHub Check: test
  • GitHub Check: build (windows-latest, nightly)
  • GitHub Check: build (macos-latest, nightly)
  • GitHub Check: build (ubuntu-latest, stable)
  • GitHub Check: build (windows-latest, stable)
  • GitHub Check: build (macos-latest, stable)
  • GitHub Check: build (ubuntu-latest, nightly)
  • GitHub Check: auto-approve
🔇 Additional comments (1)
rocketmq-common/src/common/broker/broker_config.rs (1)

892-893: Default looks good.

1000 ms is a reasonable default and aligns with other interval defaults.

Comment on lines +1266 to +1277
pub(crate) fn schedule_send_heartbeat(&mut self) {
let broker_heartbeat_interval = self.inner.broker_config.broker_heartbeat_interval;
let inner_ = self.inner.clone();
self.scheduled_task_manager.add_fixed_rate_task_async(
Duration::from_millis(1000),
Duration::from_millis(broker_heartbeat_interval),
async move |_ctx| {
inner_.send_heartbeat().await;
Ok(())
},
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

🧩 Analysis chain

Avoid double-scheduling and clamp the period.

  • Risk: start() calls schedule_send_heartbeat() under both enable_slave_acting_master and enable_controller_mode; if both true, heartbeats are scheduled twice.
  • Also clamp the period to a safe range to avoid near-0 delays.

Apply this diff to clamp the interval locally:

 pub(crate) fn schedule_send_heartbeat(&mut self) {
-    let broker_heartbeat_interval = self.inner.broker_config.broker_heartbeat_interval;
+    let broker_heartbeat_interval = self.inner.broker_config.broker_heartbeat_interval;
+    // defend against 0/too-small/too-large values
+    let period_ms = broker_heartbeat_interval.clamp(100, 60_000);
     let inner_ = self.inner.clone();
     self.scheduled_task_manager.add_fixed_rate_task_async(
         Duration::from_millis(1000),
-        Duration::from_millis(broker_heartbeat_interval),
+        Duration::from_millis(period_ms),
         async move |_ctx| {
             inner_.send_heartbeat().await;
             Ok(())
         },
     );
 }

And in start(), call it once when either flag is set:

// start()
if self.inner.broker_config.enable_slave_acting_master
    || self.inner.broker_config.enable_controller_mode
{
    self.schedule_send_heartbeat();
}

🏁 Script executed:

#!/bin/bash
# Show all call sites to confirm singular scheduling
rg -n -C2 'schedule_send_heartbeat\s*\(' -- rocketmq-broker/src/broker_runtime.rs

Length of output: 593


Avoid double-scheduling heartbeats and clamp heartbeat period

  • start() currently calls schedule_send_heartbeat() in two places (lines 1192 and 1223) — this double-schedules the task.
  • Clamp broker_heartbeat_interval to a safe range to prevent near‑0 or huge periods.

Location: rocketmq-broker/src/broker_runtime.rs — calls at lines 1192, 1223; schedule_send_heartbeat() at 1266.

Apply these changes:

pub(crate) fn schedule_send_heartbeat(&mut self) {
-    let broker_heartbeat_interval = self.inner.broker_config.broker_heartbeat_interval;
+    let broker_heartbeat_interval = self.inner.broker_config.broker_heartbeat_interval;
+    // defend against 0/too-small/too-large values
+    let period_ms = broker_heartbeat_interval.clamp(100, 60_000);
     let inner_ = self.inner.clone();
     self.scheduled_task_manager.add_fixed_rate_task_async(
         Duration::from_millis(1000),
-        Duration::from_millis(broker_heartbeat_interval),
+        Duration::from_millis(period_ms),
         async move |_ctx| {
             inner_.send_heartbeat().await;
             Ok(())
         },
     );
}
// start()
-if self.inner.broker_config.enable_slave_acting_master
-    {
-    self.schedule_send_heartbeat();
-}
-if self.inner.broker_config.enable_controller_mode
-{
-    self.schedule_send_heartbeat();
-}
+if self.inner.broker_config.enable_slave_acting_master
+    || self.inner.broker_config.enable_controller_mode
+{
+    self.schedule_send_heartbeat();
+}
🤖 Prompt for AI Agents
In rocketmq-broker/src/broker_runtime.rs around lines 1192, 1223 and 1266, the
send-heartbeat task is being scheduled twice and the broker_heartbeat_interval
is used raw; to fix, ensure schedule_send_heartbeat() is only invoked once
(remove the duplicate call at either line 1192 or 1223 or gate the second call
behind a boolean/initialized flag so the task is scheduled idempotently), and
clamp broker_heartbeat_interval to a safe range before converting to Duration
(e.g. let interval_ms = broker_heartbeat_interval.clamp(500, 86_400_000); then
use Duration::from_millis(interval_ms)) so near‑zero or huge periods are
prevented.

Comment on lines +370 to +373
#[inline]
pub fn broker_heartbeat_interval() -> u64 {
1000
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

🧩 Analysis chain

Guard against zero/too-small intervals to prevent hot-looping.

If users set broker_heartbeat_interval to 0 or an extremely small value, the scheduler will spin at near-0 delay. Enforce a sane minimum (e.g., clamp to [100, 60000] ms) at read/use time.

Would you like me to wire a clamp in the scheduler (preferred) or add serde-time validation on deserialization?


🏁 Script executed:

#!/bin/bash
set -euo pipefail
echo "=== repo root: $(pwd) ==="
echo "=== Searching for broker_heartbeat_interval ==="
rg -n --hidden -S "broker_heartbeat_interval" || true
echo "=== Searching for 'broker_heartbeat' ==="
rg -n --hidden -S "broker_heartbeat" || true
echo "=== Searching for BrokerConfig struct ==="
rg -n --hidden -S "struct\s+BrokerConfig" -g '!**/target/**' || true
echo "=== Listing files that might define broker config ==="
rg -n --hidden -S "broker.*config" || true
file=$(rg -l --hidden -S "pub fn broker_heartbeat_interval" || true)
if [ -n "$file" ]; then
  echo "Found in: $file"
  sed -n '1,240p' "$file"
fi
echo "=== Searching for uses of broker_heartbeat_interval at call sites ==="
rg -n --hidden -S "broker_heartbeat_interval\s*\(" || true
rg -n --hidden -S "\.broker_heartbeat_interval\b" || true

Length of output: 77150


🏁 Script executed:

#!/bin/bash
set -euo pipefail
echo "=== repo root: $(pwd) ==="
echo "=== rg context for broker_heartbeat_interval ==="
rg -n --hidden -S -C3 "broker_heartbeat_interval" || true
echo
echo "=== Showing broker_runtime.rs (1200..1300) ==="
sed -n '1200,1300p' rocketmq-broker/src/broker_runtime.rs || true
echo
echo "=== Showing broker_config.rs (340..420) ==="
sed -n '340,420p' rocketmq-common/src/common/broker/broker_config.rs || true
echo
echo "=== Showing broker_config.rs (740..920) ==="
sed -n '740,920p' rocketmq-common/src/common/broker/broker_config.rs || true

Length of output: 15964


Guard against zero/too-small intervals to prevent hot-looping.

broker_heartbeat_interval defaults to 1000 (defaults::broker_heartbeat_interval) and is serde-defaulted into BrokerConfig, but schedule_send_heartbeat uses broker_config.broker_heartbeat_interval directly (rocketmq-broker/src/broker_runtime.rs::schedule_send_heartbeat — Duration::from_millis(broker_heartbeat_interval)) with no clamp; a configured 0 or very small value will cause the scheduler to spin. Enforce a sane clamp at use time (preferred) — e.g. clamp to [100, 60000] ms before calling add_fixed_rate_task_async in schedule_send_heartbeat. Alternatively, add serde-level validation in rocketmq-common/src/common/broker/broker_config.rs to reject invalid values.

Would you like me to wire the clamp in the scheduler (preferred) or add serde-time validation on deserialization?

🤖 Prompt for AI Agents
In rocketmq-common/src/common/broker/broker_config.rs around lines 370-373 the
broker_heartbeat_interval default is 1000 but callers currently use the raw
value; update the scheduler instead: in rocketmq-broker/src/broker_runtime.rs
where schedule_send_heartbeat calls
Duration::from_millis(broker_config.broker_heartbeat_interval), clamp that value
to a sane range (e.g. min 100 ms, max 60000 ms) before constructing the Duration
and before passing it to add_fixed_rate_task_async to avoid hot-looping from 0
or tiny values; alternatively, if you prefer validation on deserialization, add
serde-level checks to broker_config to reject values outside that range, but
prefer the scheduler clamp as the primary fix.

@mxsm mxsm deleted the feature-4048 branch September 19, 2025 16:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AI review first Ai review pr first approved PR has approved auto merge feature🚀 Suggest an idea for this project.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature🚀] Add broker heartbeat interval configuration and scheduling

3 participants