-
Notifications
You must be signed in to change notification settings - Fork 174
[ISSUE #4048]🚀Add broker heartbeat interval configuration and scheduling #4049
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughImplements periodic heartbeat scheduling in broker runtime, triggered on startup when specific modes are enabled. Adds a new broker_heartbeat_interval configuration (default 1000 ms) and wires it into scheduling. Heartbeat execution stub logs an error placeholder. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Admin as Operator/Config
participant Start as BrokerRuntime::start()
participant Runtime as BrokerRuntime
participant Scheduler as Tokio Task Scheduler
participant Inner as BrokerRuntimeInner
Admin->>Start: Start broker
Start->>Runtime: Check modes (slave acting master / controller)
alt modes enabled
Runtime->>Scheduler: schedule_send_heartbeat(initial=1s, period=broker_heartbeat_interval)
loop every interval
Scheduler->>Inner: send_heartbeat()
note right of Inner: Current implementation logs "unimplemented"
end
else modes disabled
Start-->>Admin: Continue without heartbeat scheduling
end
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing touches
🧪 Generate unit tests
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal). Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds broker heartbeat interval configuration functionality to the RocketMQ Rust implementation. It introduces a configurable heartbeat interval setting and implements the scheduling mechanism for heartbeat sending.
- Adds
broker_heartbeat_interval
configuration parameter with 1000ms default value - Implements heartbeat scheduling using a fixed-rate task
- Adds placeholder implementation for the
send_heartbeat
method
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
File | Description |
---|---|
rocketmq-common/src/common/broker/broker_config.rs | Adds broker_heartbeat_interval configuration field with default value and serialization support |
rocketmq-broker/src/broker_runtime.rs | Implements heartbeat scheduling logic and adds placeholder send_heartbeat method |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
Duration::from_millis(1000), | ||
Duration::from_millis(broker_heartbeat_interval), |
Copilot
AI
Sep 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hardcoded initial delay of 1000ms should use the same configurable interval as the periodic interval. Consider using Duration::from_millis(broker_heartbeat_interval)
for both parameters to maintain consistency.
Duration::from_millis(1000), | |
Duration::from_millis(broker_heartbeat_interval), | |
Duration::from_millis(broker_heartbeat_interval), | |
Duration::from_millis(broker_heartbeat_interval), |
Copilot uses AI. Check for mistakes.
} | ||
|
||
async fn send_heartbeat(&self) { | ||
error!("unimplemented") |
Copilot
AI
Sep 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message should be more descriptive and follow consistent formatting. Consider using 'BrokerRuntimeInner::send_heartbeat is not implemented' to match the pattern used in other unimplemented methods in this file.
error!("unimplemented") | |
unimplemented!("BrokerRuntimeInner::send_heartbeat is not implemented") |
Copilot uses AI. Check for mistakes.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4049 +/- ##
==========================================
- Coverage 26.52% 26.51% -0.01%
==========================================
Files 570 570
Lines 80910 80926 +16
==========================================
Hits 21461 21461
- Misses 59449 59465 +16 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
rocketmq-common/src/common/broker/broker_config.rs (1)
770-772
: Expose heartbeat interval in properties for observability.Other BrokerConfig fields are surfaced via get_properties(); consider adding "brokerHeartbeatInterval" for parity and easier introspection.
rocketmq-broker/src/broker_runtime.rs (2)
2592-2594
: Downgrade log level to avoid periodic error spam until implemented.This runs every tick; error! will flood logs. Use warn!/debug!/trace! instead.
- async fn send_heartbeat(&self) { - error!("unimplemented") - } + async fn send_heartbeat(&self) { + warn!("send_heartbeat not implemented"); + }Note: If you adopt this, remove the now-unused
use tracing::error;
.
58-58
: Import is only used by the stub.If you switch to warn!/debug!, drop this import.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
rocketmq-broker/src/broker_runtime.rs
(3 hunks)rocketmq-common/src/common/broker/broker_config.rs
(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
rocketmq-broker/src/broker_runtime.rs (3)
rocketmq-common/src/common/broker/broker_config.rs (1)
broker_heartbeat_interval
(371-373)rocketmq-store/src/ha/default_ha_connection.rs (1)
send_heartbeat
(709-725)rocketmq-client/src/implementation/mq_client_api_impl.rs (1)
send_heartbeat
(1094-1122)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: build
- GitHub Check: test
- GitHub Check: build (windows-latest, nightly)
- GitHub Check: build (macos-latest, nightly)
- GitHub Check: build (ubuntu-latest, stable)
- GitHub Check: build (windows-latest, stable)
- GitHub Check: build (macos-latest, stable)
- GitHub Check: build (ubuntu-latest, nightly)
- GitHub Check: auto-approve
🔇 Additional comments (1)
rocketmq-common/src/common/broker/broker_config.rs (1)
892-893
: Default looks good.1000 ms is a reasonable default and aligns with other interval defaults.
pub(crate) fn schedule_send_heartbeat(&mut self) { | ||
let broker_heartbeat_interval = self.inner.broker_config.broker_heartbeat_interval; | ||
let inner_ = self.inner.clone(); | ||
self.scheduled_task_manager.add_fixed_rate_task_async( | ||
Duration::from_millis(1000), | ||
Duration::from_millis(broker_heartbeat_interval), | ||
async move |_ctx| { | ||
inner_.send_heartbeat().await; | ||
Ok(()) | ||
}, | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Avoid double-scheduling and clamp the period.
- Risk: start() calls schedule_send_heartbeat() under both enable_slave_acting_master and enable_controller_mode; if both true, heartbeats are scheduled twice.
- Also clamp the period to a safe range to avoid near-0 delays.
Apply this diff to clamp the interval locally:
pub(crate) fn schedule_send_heartbeat(&mut self) {
- let broker_heartbeat_interval = self.inner.broker_config.broker_heartbeat_interval;
+ let broker_heartbeat_interval = self.inner.broker_config.broker_heartbeat_interval;
+ // defend against 0/too-small/too-large values
+ let period_ms = broker_heartbeat_interval.clamp(100, 60_000);
let inner_ = self.inner.clone();
self.scheduled_task_manager.add_fixed_rate_task_async(
Duration::from_millis(1000),
- Duration::from_millis(broker_heartbeat_interval),
+ Duration::from_millis(period_ms),
async move |_ctx| {
inner_.send_heartbeat().await;
Ok(())
},
);
}
And in start(), call it once when either flag is set:
// start()
if self.inner.broker_config.enable_slave_acting_master
|| self.inner.broker_config.enable_controller_mode
{
self.schedule_send_heartbeat();
}
🏁 Script executed:
#!/bin/bash
# Show all call sites to confirm singular scheduling
rg -n -C2 'schedule_send_heartbeat\s*\(' -- rocketmq-broker/src/broker_runtime.rs
Length of output: 593
Avoid double-scheduling heartbeats and clamp heartbeat period
- start() currently calls schedule_send_heartbeat() in two places (lines 1192 and 1223) — this double-schedules the task.
- Clamp broker_heartbeat_interval to a safe range to prevent near‑0 or huge periods.
Location: rocketmq-broker/src/broker_runtime.rs — calls at lines 1192, 1223; schedule_send_heartbeat() at 1266.
Apply these changes:
pub(crate) fn schedule_send_heartbeat(&mut self) {
- let broker_heartbeat_interval = self.inner.broker_config.broker_heartbeat_interval;
+ let broker_heartbeat_interval = self.inner.broker_config.broker_heartbeat_interval;
+ // defend against 0/too-small/too-large values
+ let period_ms = broker_heartbeat_interval.clamp(100, 60_000);
let inner_ = self.inner.clone();
self.scheduled_task_manager.add_fixed_rate_task_async(
Duration::from_millis(1000),
- Duration::from_millis(broker_heartbeat_interval),
+ Duration::from_millis(period_ms),
async move |_ctx| {
inner_.send_heartbeat().await;
Ok(())
},
);
}
// start()
-if self.inner.broker_config.enable_slave_acting_master
- {
- self.schedule_send_heartbeat();
-}
-if self.inner.broker_config.enable_controller_mode
-{
- self.schedule_send_heartbeat();
-}
+if self.inner.broker_config.enable_slave_acting_master
+ || self.inner.broker_config.enable_controller_mode
+{
+ self.schedule_send_heartbeat();
+}
🤖 Prompt for AI Agents
In rocketmq-broker/src/broker_runtime.rs around lines 1192, 1223 and 1266, the
send-heartbeat task is being scheduled twice and the broker_heartbeat_interval
is used raw; to fix, ensure schedule_send_heartbeat() is only invoked once
(remove the duplicate call at either line 1192 or 1223 or gate the second call
behind a boolean/initialized flag so the task is scheduled idempotently), and
clamp broker_heartbeat_interval to a safe range before converting to Duration
(e.g. let interval_ms = broker_heartbeat_interval.clamp(500, 86_400_000); then
use Duration::from_millis(interval_ms)) so near‑zero or huge periods are
prevented.
#[inline] | ||
pub fn broker_heartbeat_interval() -> u64 { | ||
1000 | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Guard against zero/too-small intervals to prevent hot-looping.
If users set broker_heartbeat_interval to 0 or an extremely small value, the scheduler will spin at near-0 delay. Enforce a sane minimum (e.g., clamp to [100, 60000] ms) at read/use time.
Would you like me to wire a clamp in the scheduler (preferred) or add serde-time validation on deserialization?
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== repo root: $(pwd) ==="
echo "=== Searching for broker_heartbeat_interval ==="
rg -n --hidden -S "broker_heartbeat_interval" || true
echo "=== Searching for 'broker_heartbeat' ==="
rg -n --hidden -S "broker_heartbeat" || true
echo "=== Searching for BrokerConfig struct ==="
rg -n --hidden -S "struct\s+BrokerConfig" -g '!**/target/**' || true
echo "=== Listing files that might define broker config ==="
rg -n --hidden -S "broker.*config" || true
file=$(rg -l --hidden -S "pub fn broker_heartbeat_interval" || true)
if [ -n "$file" ]; then
echo "Found in: $file"
sed -n '1,240p' "$file"
fi
echo "=== Searching for uses of broker_heartbeat_interval at call sites ==="
rg -n --hidden -S "broker_heartbeat_interval\s*\(" || true
rg -n --hidden -S "\.broker_heartbeat_interval\b" || true
Length of output: 77150
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== repo root: $(pwd) ==="
echo "=== rg context for broker_heartbeat_interval ==="
rg -n --hidden -S -C3 "broker_heartbeat_interval" || true
echo
echo "=== Showing broker_runtime.rs (1200..1300) ==="
sed -n '1200,1300p' rocketmq-broker/src/broker_runtime.rs || true
echo
echo "=== Showing broker_config.rs (340..420) ==="
sed -n '340,420p' rocketmq-common/src/common/broker/broker_config.rs || true
echo
echo "=== Showing broker_config.rs (740..920) ==="
sed -n '740,920p' rocketmq-common/src/common/broker/broker_config.rs || true
Length of output: 15964
Guard against zero/too-small intervals to prevent hot-looping.
broker_heartbeat_interval defaults to 1000 (defaults::broker_heartbeat_interval) and is serde-defaulted into BrokerConfig, but schedule_send_heartbeat uses broker_config.broker_heartbeat_interval directly (rocketmq-broker/src/broker_runtime.rs::schedule_send_heartbeat — Duration::from_millis(broker_heartbeat_interval)) with no clamp; a configured 0 or very small value will cause the scheduler to spin. Enforce a sane clamp at use time (preferred) — e.g. clamp to [100, 60000] ms before calling add_fixed_rate_task_async in schedule_send_heartbeat. Alternatively, add serde-level validation in rocketmq-common/src/common/broker/broker_config.rs to reject invalid values.
Would you like me to wire the clamp in the scheduler (preferred) or add serde-time validation on deserialization?
🤖 Prompt for AI Agents
In rocketmq-common/src/common/broker/broker_config.rs around lines 370-373 the
broker_heartbeat_interval default is 1000 but callers currently use the raw
value; update the scheduler instead: in rocketmq-broker/src/broker_runtime.rs
where schedule_send_heartbeat calls
Duration::from_millis(broker_config.broker_heartbeat_interval), clamp that value
to a sane range (e.g. min 100 ms, max 60000 ms) before constructing the Duration
and before passing it to add_fixed_rate_task_async to avoid hot-looping from 0
or tiny values; alternatively, if you prefer validation on deserialization, add
serde-level checks to broker_config to reject values outside that range, but
prefer the scheduler clamp as the primary fix.
Which Issue(s) This PR Fixes(Closes)
Fixes #4048
Brief Description
How Did You Test This Change?
Summary by CodeRabbit