-
Notifications
You must be signed in to change notification settings - Fork 174
[ISSUE #4029] 🚀Add reject_request method to handle message acceptance conditions in send_message_processor #4030
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… conditions in send_message_processor
WalkthroughImplements early request rejection in SendMessageProcessor by adding reject_request. It returns SlaveNotAvailable when in slave mode without acting-master, or SystemBusy when the store is busy or transient pool is deficient. Otherwise, it does not reject. Necessary enums/types are imported to support this logic. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant C as Client
participant P as SendMessageProcessor
participant S as MessageStore
C->>P: SendMessage request
rect rgba(230,240,255,0.5)
note right of P: New early-rejection via reject_request
alt Slave mode without acting-master
P-->>C: Reject: SlaveNotAvailable
else Store busy or pool deficient
P-->>C: Reject: SystemBusy
else No rejection
P-->>C: Proceed (no rejection)
P->>S: Append message
S-->>P: Store result
P-->>C: Final response
end
end
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes Pre-merge checks (4 passed, 1 warning)❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
Poem
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds a new reject_request
method to the send_message_processor
to handle message acceptance conditions, implementing proper request rejection logic based on broker state and configuration.
- Adds conditional logic to reject requests when broker is in slave mode without slave acting master enabled
- Adds system busy detection to reject requests when message store is under resource pressure
- Returns appropriate error responses with descriptive messages for rejected requests
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
true, | ||
Some(RemotingCommand::create_response_command_with_code_remark( | ||
ResponseCode::SlaveNotAvailable, | ||
"The broker is slave mode, not allowed to accept message", |
Copilot
AI
Sep 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message 'The broker is slave mode, not allowed to accept message' should be grammatically correct. Consider changing it to 'The broker is in slave mode and is not allowed to accept messages'.
"The broker is slave mode, not allowed to accept message", | |
"The broker is in slave mode and is not allowed to accept messages", |
Copilot uses AI. Check for mistakes.
Some(RemotingCommand::create_response_command_with_code_remark( | ||
ResponseCode::SystemBusy, | ||
"The broker message store is busy, please try again later", | ||
)), |
Copilot
AI
Sep 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message 'The broker is slave mode, not allowed to accept message' should be grammatically correct. Consider changing it to 'The broker is in slave mode and is not allowed to accept messages'.
Copilot uses AI. Check for mistakes.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4030 +/- ##
==========================================
- Coverage 26.53% 26.52% -0.02%
==========================================
Files 565 565
Lines 80733 80766 +33
==========================================
Hits 21426 21426
- Misses 59307 59340 +33 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
rocketmq-broker/src/processor/send_message_processor.rs (3)
144-151
: Nit: polish the rejection remark.
Minor grammar and clarity tweak for the slave-mode message.Apply:
- "The broker is slave mode, not allowed to accept message", + "The broker is in slave mode; not allowed to accept messages",
133-167
: Optionally scope rejections to relevant request codes.
Not required, but you can avoid rejecting unrelated codes by using the provided code param.Apply:
- fn reject_request(&self, _code: i32) -> RejectRequestResponse { + fn reject_request(&self, code: i32) -> RejectRequestResponse { + let request_code = RequestCode::from(code); + if !matches!( + request_code, + RequestCode::SendMessage + | RequestCode::SendMessageV2 + | RequestCode::SendBatchMessage + | RequestCode::ConsumerSendMsgBack + ) { + return (false, None); + }
133-167
: Optional: add observability for early rejection paths.
A warn! with reason (slave mode or busy store) will help ops correlate client-side failures.Would you like me to add targeted warn! logs for each early-return branch?
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
rocketmq-broker/src/processor/send_message_processor.rs
(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
rocketmq-broker/src/processor/send_message_processor.rs (5)
rocketmq-store/src/config/message_store_config.rs (1)
broker_role
(223-225)rocketmq-broker/src/processor.rs (2)
reject_request
(142-160)reject_request
(235-251)rocketmq-remoting/src/runtime/processor.rs (1)
reject_request
(35-37)rocketmq-remoting/src/protocol/remoting_command.rs (1)
create_response_command_with_code_remark
(196-204)rocketmq-broker/src/broker_runtime.rs (1)
message_store
(1916-1918)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: build (macos-latest, nightly)
- GitHub Check: test
- GitHub Check: build (windows-latest, stable)
- GitHub Check: build
- GitHub Check: build (windows-latest, nightly)
- GitHub Check: build (macos-latest, stable)
- GitHub Check: build (ubuntu-latest, nightly)
- GitHub Check: build (ubuntu-latest, stable)
- GitHub Check: auto-approve
🔇 Additional comments (3)
rocketmq-broker/src/processor/send_message_processor.rs (3)
25-25
: LGTM: needed enum import.
The BrokerRole import is correct for the new rejection logic.
69-69
: LGTM: correct alias for trait return type.
RejectRequestResponse import matches the trait signature.
133-167
: Checks passed — no action required. broker_role is a public field on MessageStoreConfig (rocketmq-store/src/config/message_store_config.rs); ResponseCode::SystemBusy is defined (rocketmq-remoting/src/code/response_code.rs); MessageStore declares is_os_page_cache_busy and is_transient_store_pool_deficient and LocalFileMessageStore implements them (rocketmq-store/src/base/message_store.rs; rocketmq-store/src/message_store/local_file_message_store.rs).
let message_store = self.inner.broker_runtime_inner.message_store_unchecked(); | ||
if message_store.is_os_page_cache_busy() | ||
|| message_store.is_transient_store_pool_deficient() | ||
{ | ||
return ( | ||
true, | ||
Some(RemotingCommand::create_response_command_with_code_remark( | ||
ResponseCode::SystemBusy, | ||
"The broker message store is busy, please try again later", | ||
)), | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid potential panic: don’t call message_store_unchecked() here.
reject_request can be invoked very early; if the message store hasn’t been initialized, message_store_unchecked() may panic. Prefer checking the Option and return ServiceNotAvailable when absent.
Apply this diff within this block:
- let message_store = self.inner.broker_runtime_inner.message_store_unchecked();
- if message_store.is_os_page_cache_busy()
- || message_store.is_transient_store_pool_deficient()
- {
- return (
- true,
- Some(RemotingCommand::create_response_command_with_code_remark(
- ResponseCode::SystemBusy,
- "The broker message store is busy, please try again later",
- )),
- );
- }
+ if let Some(ms) = self.inner.broker_runtime_inner.message_store().as_ref() {
+ if ms.is_os_page_cache_busy() || ms.is_transient_store_pool_deficient() {
+ return (
+ true,
+ Some(RemotingCommand::create_response_command_with_code_remark(
+ ResponseCode::SystemBusy,
+ "The broker message store is busy, please try again later",
+ )),
+ );
+ }
+ } else {
+ return (
+ true,
+ Some(RemotingCommand::create_response_command_with_code_remark(
+ ResponseCode::ServiceNotAvailable,
+ "Message store unavailable; broker not ready",
+ )),
+ );
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let message_store = self.inner.broker_runtime_inner.message_store_unchecked(); | |
if message_store.is_os_page_cache_busy() | |
|| message_store.is_transient_store_pool_deficient() | |
{ | |
return ( | |
true, | |
Some(RemotingCommand::create_response_command_with_code_remark( | |
ResponseCode::SystemBusy, | |
"The broker message store is busy, please try again later", | |
)), | |
); | |
} | |
if let Some(ms) = self.inner.broker_runtime_inner.message_store().as_ref() { | |
if ms.is_os_page_cache_busy() || ms.is_transient_store_pool_deficient() { | |
return ( | |
true, | |
Some(RemotingCommand::create_response_command_with_code_remark( | |
ResponseCode::SystemBusy, | |
"The broker message store is busy, please try again later", | |
)), | |
); | |
} | |
} else { | |
return ( | |
true, | |
Some(RemotingCommand::create_response_command_with_code_remark( | |
ResponseCode::ServiceNotAvailable, | |
"Message store unavailable; broker not ready", | |
)), | |
); | |
} |
🤖 Prompt for AI Agents
In rocketmq-broker/src/processor/send_message_processor.rs around lines 153 to
164, avoid calling message_store_unchecked() which may panic if the message
store isn't initialized; instead, first attempt to get the Option via
message_store() (or the safe accessor used elsewhere), and if it is None return
(true,
Some(RemotingCommand::create_response_command_with_code_remark(ResponseCode::ServiceNotAvailable,
"The broker message store is not available"))) ; only if Some(message_store)
proceed to call is_os_page_cache_busy() or is_transient_store_pool_deficient()
and return SystemBusy as before.
Which Issue(s) This PR Fixes(Closes)
Fixes #4029
Brief Description
How Did You Test This Change?
Summary by CodeRabbit