Skip to content

Conversation

mxsm
Copy link
Owner

@mxsm mxsm commented Sep 12, 2025

Which Issue(s) This PR Fixes(Closes)

Fixes #4029

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • Bug Fixes
    • Send requests are now promptly rejected when the broker is running as a slave without acting-master enabled, returning a clear “slave not available” response.
    • When system resources are constrained (e.g., OS page cache busy or transient storage pool low), requests receive a “system busy” response.
    • Improves client feedback and reduces unnecessary waiting under these conditions, enhancing reliability during high-load or restricted operation scenarios.

@Copilot Copilot AI review requested due to automatic review settings September 12, 2025 03:52
Copy link
Contributor

coderabbitai bot commented Sep 12, 2025

Walkthrough

Implements early request rejection in SendMessageProcessor by adding reject_request. It returns SlaveNotAvailable when in slave mode without acting-master, or SystemBusy when the store is busy or transient pool is deficient. Otherwise, it does not reject. Necessary enums/types are imported to support this logic.

Changes

Cohort / File(s) Summary of Changes
Send Message Processor
rocketmq-broker/src/processor/send_message_processor.rs
Added reject_request in RequestProcessor impl for SendMessageProcessor<MS, TS>: checks slave role vs. acting-master setting and store pressure (OS page cache busy or transient pool deficient); returns appropriate RejectRequestResponse. Added imports for BrokerRole and RejectRequestResponse.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant C as Client
  participant P as SendMessageProcessor
  participant S as MessageStore

  C->>P: SendMessage request
  rect rgba(230,240,255,0.5)
    note right of P: New early-rejection via reject_request
    alt Slave mode without acting-master
      P-->>C: Reject: SlaveNotAvailable
    else Store busy or pool deficient
      P-->>C: Reject: SystemBusy
    else No rejection
      P-->>C: Proceed (no rejection)
      P->>S: Append message
      S-->>P: Store result
      P-->>C: Final response
    end
  end
Loading

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~10 minutes

Pre-merge checks (4 passed, 1 warning)

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The PR title directly describes the primary change—adding a reject_request method for SendMessageProcessor to handle message acceptance conditions—so it is aligned with the changeset and clearly summarizes the main change; it is somewhat verbose and includes an emoji and issue number which are cosmetic but not misleading.
Linked Issues Check ✅ Passed Issue #4029 requested adding a reject_request method; the PR introduces that method on SendMessageProcessor and implements the acceptance checks described in the summary (slave acting-master refusal and store/transient busy -> SystemBusy), so the code changes meet the linked issue's coding objective even though the issue provided no additional acceptance criteria or tests.
Out of Scope Changes Check ✅ Passed The modifications are limited to implementing reject_request in send_message_processor.rs and adding necessary imports; no other files or unrelated features are indicated in the provided summary, so there are no apparent out-of-scope changes.

Poem

I thump my paw: “Not now,” I say,
When slaves can’t lead or caches sway.
If pages groan, I pause the queue—
A busy burrow won’t do!
But when the warren’s calm and clear,
I hop your message far and near. 🐇✨

✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature-4029

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@rocketmq-rust-bot
Copy link
Collaborator

🔊@mxsm 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

@rocketmq-rust-robot rocketmq-rust-robot added the feature🚀 Suggest an idea for this project. label Sep 12, 2025
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds a new reject_request method to the send_message_processor to handle message acceptance conditions, implementing proper request rejection logic based on broker state and configuration.

  • Adds conditional logic to reject requests when broker is in slave mode without slave acting master enabled
  • Adds system busy detection to reject requests when message store is under resource pressure
  • Returns appropriate error responses with descriptive messages for rejected requests

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

true,
Some(RemotingCommand::create_response_command_with_code_remark(
ResponseCode::SlaveNotAvailable,
"The broker is slave mode, not allowed to accept message",
Copy link

Copilot AI Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message 'The broker is slave mode, not allowed to accept message' should be grammatically correct. Consider changing it to 'The broker is in slave mode and is not allowed to accept messages'.

Suggested change
"The broker is slave mode, not allowed to accept message",
"The broker is in slave mode and is not allowed to accept messages",

Copilot uses AI. Check for mistakes.

Comment on lines +159 to +162
Some(RemotingCommand::create_response_command_with_code_remark(
ResponseCode::SystemBusy,
"The broker message store is busy, please try again later",
)),
Copy link

Copilot AI Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message 'The broker is slave mode, not allowed to accept message' should be grammatically correct. Consider changing it to 'The broker is in slave mode and is not allowed to accept messages'.

Copilot uses AI. Check for mistakes.

Copy link

codecov bot commented Sep 12, 2025

Codecov Report

❌ Patch coverage is 0% with 33 lines in your changes missing coverage. Please review.
✅ Project coverage is 26.52%. Comparing base (92b82c2) to head (3f1faee).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...tmq-broker/src/processor/send_message_processor.rs 0.00% 33 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #4030      +/-   ##
==========================================
- Coverage   26.53%   26.52%   -0.02%     
==========================================
  Files         565      565              
  Lines       80733    80766      +33     
==========================================
  Hits        21426    21426              
- Misses      59307    59340      +33     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Collaborator

@rocketmq-rust-bot rocketmq-rust-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@rocketmq-rust-bot rocketmq-rust-bot merged commit d5c9911 into main Sep 12, 2025
20 of 23 checks passed
@rocketmq-rust-bot rocketmq-rust-bot added approved PR has approved and removed ready to review waiting-review waiting review this PR labels Sep 12, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
rocketmq-broker/src/processor/send_message_processor.rs (3)

144-151: Nit: polish the rejection remark.
Minor grammar and clarity tweak for the slave-mode message.

Apply:

-                    "The broker is slave mode, not allowed to accept message",
+                    "The broker is in slave mode; not allowed to accept messages",

133-167: Optionally scope rejections to relevant request codes.
Not required, but you can avoid rejecting unrelated codes by using the provided code param.

Apply:

-    fn reject_request(&self, _code: i32) -> RejectRequestResponse {
+    fn reject_request(&self, code: i32) -> RejectRequestResponse {
+        let request_code = RequestCode::from(code);
+        if !matches!(
+            request_code,
+            RequestCode::SendMessage
+                | RequestCode::SendMessageV2
+                | RequestCode::SendBatchMessage
+                | RequestCode::ConsumerSendMsgBack
+        ) {
+            return (false, None);
+        }

133-167: Optional: add observability for early rejection paths.
A warn! with reason (slave mode or busy store) will help ops correlate client-side failures.

Would you like me to add targeted warn! logs for each early-return branch?

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 92b82c2 and 3f1faee.

📒 Files selected for processing (1)
  • rocketmq-broker/src/processor/send_message_processor.rs (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
rocketmq-broker/src/processor/send_message_processor.rs (5)
rocketmq-store/src/config/message_store_config.rs (1)
  • broker_role (223-225)
rocketmq-broker/src/processor.rs (2)
  • reject_request (142-160)
  • reject_request (235-251)
rocketmq-remoting/src/runtime/processor.rs (1)
  • reject_request (35-37)
rocketmq-remoting/src/protocol/remoting_command.rs (1)
  • create_response_command_with_code_remark (196-204)
rocketmq-broker/src/broker_runtime.rs (1)
  • message_store (1916-1918)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: build (macos-latest, nightly)
  • GitHub Check: test
  • GitHub Check: build (windows-latest, stable)
  • GitHub Check: build
  • GitHub Check: build (windows-latest, nightly)
  • GitHub Check: build (macos-latest, stable)
  • GitHub Check: build (ubuntu-latest, nightly)
  • GitHub Check: build (ubuntu-latest, stable)
  • GitHub Check: auto-approve
🔇 Additional comments (3)
rocketmq-broker/src/processor/send_message_processor.rs (3)

25-25: LGTM: needed enum import.
The BrokerRole import is correct for the new rejection logic.


69-69: LGTM: correct alias for trait return type.
RejectRequestResponse import matches the trait signature.


133-167: Checks passed — no action required. broker_role is a public field on MessageStoreConfig (rocketmq-store/src/config/message_store_config.rs); ResponseCode::SystemBusy is defined (rocketmq-remoting/src/code/response_code.rs); MessageStore declares is_os_page_cache_busy and is_transient_store_pool_deficient and LocalFileMessageStore implements them (rocketmq-store/src/base/message_store.rs; rocketmq-store/src/message_store/local_file_message_store.rs).

Comment on lines +153 to +164
let message_store = self.inner.broker_runtime_inner.message_store_unchecked();
if message_store.is_os_page_cache_busy()
|| message_store.is_transient_store_pool_deficient()
{
return (
true,
Some(RemotingCommand::create_response_command_with_code_remark(
ResponseCode::SystemBusy,
"The broker message store is busy, please try again later",
)),
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid potential panic: don’t call message_store_unchecked() here.
reject_request can be invoked very early; if the message store hasn’t been initialized, message_store_unchecked() may panic. Prefer checking the Option and return ServiceNotAvailable when absent.

Apply this diff within this block:

-        let message_store = self.inner.broker_runtime_inner.message_store_unchecked();
-        if message_store.is_os_page_cache_busy()
-            || message_store.is_transient_store_pool_deficient()
-        {
-            return (
-                true,
-                Some(RemotingCommand::create_response_command_with_code_remark(
-                    ResponseCode::SystemBusy,
-                    "The broker message store is busy, please try again later",
-                )),
-            );
-        }
+        if let Some(ms) = self.inner.broker_runtime_inner.message_store().as_ref() {
+            if ms.is_os_page_cache_busy() || ms.is_transient_store_pool_deficient() {
+                return (
+                    true,
+                    Some(RemotingCommand::create_response_command_with_code_remark(
+                        ResponseCode::SystemBusy,
+                        "The broker message store is busy, please try again later",
+                    )),
+                );
+            }
+        } else {
+            return (
+                true,
+                Some(RemotingCommand::create_response_command_with_code_remark(
+                    ResponseCode::ServiceNotAvailable,
+                    "Message store unavailable; broker not ready",
+                )),
+            );
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let message_store = self.inner.broker_runtime_inner.message_store_unchecked();
if message_store.is_os_page_cache_busy()
|| message_store.is_transient_store_pool_deficient()
{
return (
true,
Some(RemotingCommand::create_response_command_with_code_remark(
ResponseCode::SystemBusy,
"The broker message store is busy, please try again later",
)),
);
}
if let Some(ms) = self.inner.broker_runtime_inner.message_store().as_ref() {
if ms.is_os_page_cache_busy() || ms.is_transient_store_pool_deficient() {
return (
true,
Some(RemotingCommand::create_response_command_with_code_remark(
ResponseCode::SystemBusy,
"The broker message store is busy, please try again later",
)),
);
}
} else {
return (
true,
Some(RemotingCommand::create_response_command_with_code_remark(
ResponseCode::ServiceNotAvailable,
"Message store unavailable; broker not ready",
)),
);
}
🤖 Prompt for AI Agents
In rocketmq-broker/src/processor/send_message_processor.rs around lines 153 to
164, avoid calling message_store_unchecked() which may panic if the message
store isn't initialized; instead, first attempt to get the Option via
message_store() (or the safe accessor used elsewhere), and if it is None return
(true,
Some(RemotingCommand::create_response_command_with_code_remark(ResponseCode::ServiceNotAvailable,
"The broker message store is not available"))) ; only if Some(message_store)
proceed to call is_os_page_cache_busy() or is_transient_store_pool_deficient()
and return SystemBusy as before.

@mxsm mxsm deleted the feature-4029 branch September 14, 2025 08:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AI review first Ai review pr first approved PR has approved auto merge feature🚀 Suggest an idea for this project.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature🚀] Add reject_request method to handle message acceptance conditions in send_message_processor

3 participants