-
Notifications
You must be signed in to change notification settings - Fork 173
[ISSUE #4038]🚀Add page_size dependency and enhance memory management in TransientStorePool #4039
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…in TransientStorePool
WalkthroughIntroduces a Unix-targeted memory-management layer and integrates it into TransientStorePool. Adds a new error variant, updates locking to parking_lot, changes init/destroy to return results and perform mlock/munlock, wires calls from LocalFileMessageStore, and adds workspace dependencies (rocketmq-error, page_size) with a new utils::ffi module. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor App as LocalFileMessageStore
participant Pool as TransientStorePool
participant FFI as utils::ffi
participant OS as OS (Unix)
App->>Pool: init()
loop For each buffer
Pool->>FFI: mlock(addr, len)
FFI->>OS: mlock(…)
OS-->>FFI: rc (0/non-zero)
alt rc != 0
FFI-->>Pool: Err(StoreCustomError("mlock failed"))
Pool-->>App: Err(...)
else rc == 0
FFI-->>Pool: Ok(())
end
end
Pool-->>App: Ok(())
note over App,Pool: App propagates error on init failure
sequenceDiagram
autonumber
actor App as LocalFileMessageStore
participant Pool as TransientStorePool
participant FFI as utils::ffi
participant OS as OS (Unix)
App->>Pool: destroy()
loop For each buffer
Pool->>FFI: munlock(addr, len)
FFI->>OS: munlock(…)
OS-->>FFI: rc
alt rc != 0
FFI-->>Pool: Err(StoreCustomError("munlock failed"))
Pool-->>App: Err(...)
note over App: Caller currently ignores destroy() result
else rc == 0
FFI-->>Pool: Ok(())
end
end
Pool-->>App: Ok()
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing touches
🧪 Generate unit tests
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal). Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR enhances memory management in the TransientStorePool by adding page locking functionality and improves error handling. The changes support better memory control for high-performance message storage operations.
- Adds FFI bindings for memory management operations (mlock, munlock, madvise, mincore)
- Updates TransientStorePool to use memory locking and proper error handling
- Replaces std::sync::Mutex with parking_lot::Mutex for better performance
Reviewed Changes
Copilot reviewed 6 out of 7 changed files in this pull request and generated 4 comments.
Show a summary per file
File | Description |
---|---|
rocketmq-store/src/utils/ffi.rs | New FFI module providing memory management functions and page size utilities |
rocketmq-store/src/utils.rs | Adds ffi module declaration |
rocketmq-store/src/message_store/local_file_message_store.rs | Updates to handle new error-returning TransientStorePool methods |
rocketmq-store/src/base/transient_store_pool.rs | Enhanced with memory locking, error handling, and parking_lot mutex |
rocketmq-store/Cargo.toml | Adds page_size dependency and changes libc target from linux to unix |
rocketmq-error/src/lib.rs | Adds StoreCustomError variant for store-specific errors |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
pub fn mlock(addr: *const u8, len: usize) -> RocketMQResult<()> { | ||
#[cfg(unix)] | ||
{ | ||
let result = unsafe { libc::mlock(addr as *const c_void, len) }; | ||
if result != 0 { | ||
return Err(RocketmqError::StoreCustomError("mlock failed".to_string())); | ||
} | ||
Ok(()) | ||
} | ||
} |
Copilot
AI
Sep 13, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function lacks a return statement for non-unix platforms. Add #[cfg(not(unix))] Ok(())
or similar to handle compilation on other platforms.
Copilot uses AI. Check for mistakes.
pub fn munlock(addr: *const u8, len: usize) -> RocketMQResult<()> { | ||
#[cfg(unix)] | ||
{ | ||
let result = unsafe { libc::munlock(addr as *const c_void, len) }; | ||
if result != 0 { | ||
return Err(RocketmqError::StoreCustomError( | ||
"munlock failed".to_string(), | ||
)); | ||
} | ||
Ok(()) | ||
} | ||
} |
Copilot
AI
Sep 13, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function lacks a return statement for non-unix platforms. Add #[cfg(not(unix))] Ok(())
or similar to handle compilation on other platforms.
Copilot uses AI. Check for mistakes.
pub fn madvise(addr: *const u8, len: usize, advice: i32) -> i32 { | ||
#[cfg(unix)] | ||
{ | ||
unsafe { libc::madvise(addr as *mut c_void, len, advice) } | ||
} | ||
} |
Copilot
AI
Sep 13, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function lacks a return statement for non-unix platforms. Add #[cfg(not(unix))] 0
or appropriate default value to handle compilation on other platforms.
Copilot uses AI. Check for mistakes.
pub fn mincore(addr: *const u8, len: usize, vec: *const u8) -> i32 { | ||
#[cfg(unix)] | ||
{ | ||
unsafe { libc::mincore(addr as *mut c_void, len, vec as *mut c_uchar) } | ||
} | ||
} |
Copilot
AI
Sep 13, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function lacks a return statement for non-unix platforms. Add #[cfg(not(unix))] 0
or appropriate default value to handle compilation on other platforms.
Copilot uses AI. Check for mistakes.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #4039 +/- ##
==========================================
- Coverage 26.51% 26.51% -0.01%
==========================================
Files 567 568 +1
Lines 80845 80867 +22
==========================================
Hits 21440 21440
- Misses 59405 59427 +22 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (7)
rocketmq-error/src/lib.rs (1)
159-160
: Add OS error context to StoreCustomError for better diagnosabilityThe new variant works, but errors from mlock/munlock lose errno/context. Prefer embedding
last_os_error()
text so operators can tell why locking failed.If you keep this variant, wire wrappers to produce messages like
"mlock failed: <os error>"
(see my ffi.rs suggestion).rocketmq-store/src/message_store/local_file_message_store.rs (3)
69-69
: Remove unused import
use rocketmq_error::RocketMQResult;
isn’t used (and the file already allows unused imports). Trim it.- use rocketmq_error::RocketMQResult;
706-709
: Preserve context when mapping pool init errorsMap the init error with context for easier debugging.
- match self.transient_store_pool.init() { - Ok(_) => {} - Err(e) => return Err(StoreError::General(e.to_string())), - } + if let Err(e) = self.transient_store_pool.init() { + return Err(StoreError::General(format!("TransientStorePool::init failed: {e}"))); + }
751-752
: Don’t swallow destroy errors silentlyLog failures so operational issues with munlock don’t go unnoticed.
- let _ = self.transient_store_pool.destroy(); + if let Err(e) = self.transient_store_pool.destroy() { + warn!("TransientStorePool::destroy failed: {e}"); + }rocketmq-store/src/utils/ffi.rs (1)
66-71
: Align mincore signature with libc expectation
mincore
writes intovec
; take*mut u8
to reflect mutability and avoid extra casts at call sites.-pub fn mincore(addr: *const u8, len: usize, vec: *const u8) -> i32 { +pub fn mincore(addr: *const u8, len: usize, vec: *mut u8) -> i32 { - #[cfg(unix)] - { - unsafe { libc::mincore(addr as *mut c_void, len, vec as *mut c_uchar) } - } + unsafe { libc::mincore(addr as *mut c_void, len, vec as *mut c_uchar) } }rocketmq-store/src/base/transient_store_pool.rs (2)
57-63
: Ensure no pinned buffers are leaked on destroy
destroy()
only munlocks buffers currently inavailable_buffers
. Borrowed buffers remain pinned if not returned before destroy.
- Track an
outstanding
counter; block (with timeout) until it reaches 0 before draining.- At minimum, warn if
outstanding > 0
.Example change sketch inside the pool (outside this hunk):
- Increment on borrow, decrement on return; in
destroy
, logoutstanding
.
71-79
: Fix threshold math for low pool sizes
self.pool_size / 10 * 4
truncates early; useself.pool_size * 4 / 10
(40%) to avoid zero thresholds for small pools.- if available_buffers.len() < self.pool_size / 10 * 4 { + if available_buffers.len() < (self.pool_size * 4 / 10) {
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lock
is excluded by!**/*.lock
📒 Files selected for processing (6)
rocketmq-error/src/lib.rs
(1 hunks)rocketmq-store/Cargo.toml
(2 hunks)rocketmq-store/src/base/transient_store_pool.rs
(3 hunks)rocketmq-store/src/message_store/local_file_message_store.rs
(3 hunks)rocketmq-store/src/utils.rs
(1 hunks)rocketmq-store/src/utils/ffi.rs
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
rocketmq-store/src/base/transient_store_pool.rs (2)
rocketmq-store/src/utils/ffi.rs (2)
mlock
(34-43)munlock
(46-57)rocketmq-store/src/message_store/local_file_message_store.rs (2)
init
(683-712)destroy
(754-764)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: build (windows-latest, stable)
- GitHub Check: build (windows-latest, nightly)
- GitHub Check: build (macos-latest, nightly)
- GitHub Check: build (ubuntu-latest, nightly)
- GitHub Check: build (macos-latest, stable)
- GitHub Check: build (ubuntu-latest, stable)
- GitHub Check: test
- GitHub Check: build
- GitHub Check: auto-approve
🔇 Additional comments (5)
rocketmq-store/Cargo.toml (2)
25-25
: LGTM: add rocketmq-error workspace depThis aligns store with shared error types.
59-63
: LGTM: add page_size and move libc under cfg(unix)These are appropriate for the new FFI path.
rocketmq-store/src/base/transient_store_pool.rs (3)
47-55
: LGTM: init pins buffers and surfaces failuresThe mlock-based initialization and
RocketMQResult<()>
API make failures explicit.
82-85
: LGTM: constant-time len read under lockSimple and correct.
93-95
: LGTM: atomicity via parking_lot::MutexSetter/getter are fine with the new lock.
pub(crate) mod ffi; | ||
pub(crate) mod store_util; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gate ffi per-OS and provide Windows stubs to keep cross-platform builds green
ffi.rs
currently compiles on non-Unix but its functions are empty under #[cfg(unix)]
blocks, which will not return and will fail on Windows. Also use libc
is unconditional in ffi.rs while libc
is only a Unix dependency.
Apply this diff to gate the Unix module and add minimal no-op stubs for non-Unix:
- pub(crate) mod ffi;
+#[cfg(unix)]
+pub(crate) mod ffi;
+#[cfg(not(unix))]
+pub(crate) mod ffi {
+ use rocketmq_error::RocketMQResult;
+ #[inline]
+ pub fn get_page_size() -> usize { page_size::get() }
+ #[inline]
+ pub fn mlock(_addr: *const u8, _len: usize) -> RocketMQResult<()> { Ok(()) }
+ #[inline]
+ pub fn munlock(_addr: *const u8, _len: usize) -> RocketMQResult<()> { Ok(()) }
+ #[inline]
+ pub fn madvise(_addr: *const u8, _len: usize, _advice: i32) -> i32 { 0 }
+ #[inline]
+ pub fn mincore(_addr: *const u8, _len: usize, _vec: *const u8) -> i32 { 0 }
+}
pub(crate) mod store_util;
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
pub(crate) mod ffi; | |
pub(crate) mod store_util; | |
#[cfg(unix)] | |
pub(crate) mod ffi; | |
#[cfg(not(unix))] | |
pub(crate) mod ffi { | |
use rocketmq_error::RocketMQResult; | |
#[inline] | |
pub fn get_page_size() -> usize { page_size::get() } | |
#[inline] | |
pub fn mlock(_addr: *const u8, _len: usize) -> RocketMQResult<()> { Ok(()) } | |
#[inline'] | |
pub fn munlock(_addr: *const u8, _len: usize) -> RocketMQResult<()> { Ok(()) } | |
#[inline] | |
pub fn madvise(_addr: *const u8, _len: usize, _advice: i32) -> i32 { 0 } | |
#[inline] | |
pub fn mincore(_addr: *const u8, _len: usize, _vec: *const u8) -> i32 { 0 } | |
} | |
pub(crate) mod store_util; |
🤖 Prompt for AI Agents
In rocketmq-store/src/utils.rs around lines 18 to 19, the ffi module is
unconditionally declared causing non-Unix builds to compile code that uses
Unix-only libc and functions that are conditionally defined with #[cfg(unix)]
leading to missing implementations on Windows; change the module declarations to
gate the Unix ffi behind #[cfg(unix)] (and keep a non-Unix stub module behind
#[cfg(not(unix))]) and move any `use libc` into the Unix-only module; implement
minimal no-op/placeholder functions in the non-Unix stub that match the Unix ffi
public signatures so Windows and other platforms have compilable stubs that
return sensible defaults or errors.
use std::ffi::c_void; | ||
|
||
use libc::c_uchar; | ||
use rocketmq_error::RocketMQResult; | ||
use rocketmq_error::RocketmqError; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gate the entire module to Unix to fix non-Unix builds
Unconditional use libc::...
will fail on Windows where libc
isn’t a dependency. Gate the file.
+#![cfg(unix)]
use std::ffi::c_void;
-use libc::c_uchar;
+use libc::c_uchar;
use rocketmq_error::RocketMQResult;
use rocketmq_error::RocketmqError;
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
use std::ffi::c_void; | |
use libc::c_uchar; | |
use rocketmq_error::RocketMQResult; | |
use rocketmq_error::RocketmqError; | |
#![cfg(unix)] | |
use std::ffi::c_void; | |
use libc::c_uchar; | |
use rocketmq_error::RocketMQResult; | |
use rocketmq_error::RocketmqError; |
🤖 Prompt for AI Agents
In rocketmq-store/src/utils/ffi.rs around lines 17 to 22, the file
unconditionally imports libc which breaks Windows builds; gate the entire module
to Unix by adding a module-level cfg (e.g., #[cfg(unix)] or #[cfg(target_family
= "unix")]) at the top of the file so the contents are compiled only on Unix
targets; if the crate expects a Windows-compatible build, add a separate Windows
stub file or guard any libc-dependent items behind the same cfg so Windows
builds do not reference libc.
#[inline] | ||
pub fn mlock(addr: *const u8, len: usize) -> RocketMQResult<()> { | ||
#[cfg(unix)] | ||
{ | ||
let result = unsafe { libc::mlock(addr as *const c_void, len) }; | ||
if result != 0 { | ||
return Err(RocketmqError::StoreCustomError("mlock failed".to_string())); | ||
} | ||
Ok(()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Include errno in mlock error
Surface last_os_error()
to aid diagnosis (e.g., RLIMIT_MEMLOCK exceeded).
pub fn mlock(addr: *const u8, len: usize) -> RocketMQResult<()> {
- #[cfg(unix)]
- {
- let result = unsafe { libc::mlock(addr as *const c_void, len) };
- if result != 0 {
- return Err(RocketmqError::StoreCustomError("mlock failed".to_string()));
- }
- Ok(())
- }
+ let result = unsafe { libc::mlock(addr as *const c_void, len) };
+ if result != 0 {
+ return Err(RocketmqError::StoreCustomError(format!(
+ "mlock failed: {}",
+ std::io::Error::last_os_error()
+ )));
+ }
+ Ok(())
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
#[inline] | |
pub fn mlock(addr: *const u8, len: usize) -> RocketMQResult<()> { | |
#[cfg(unix)] | |
{ | |
let result = unsafe { libc::mlock(addr as *const c_void, len) }; | |
if result != 0 { | |
return Err(RocketmqError::StoreCustomError("mlock failed".to_string())); | |
} | |
Ok(()) | |
} | |
} | |
#[inline] | |
pub fn mlock(addr: *const u8, len: usize) -> RocketMQResult<()> { | |
#[cfg(unix)] | |
{ | |
let result = unsafe { libc::mlock(addr as *const c_void, len) }; | |
if result != 0 { | |
return Err(RocketmqError::StoreCustomError(format!( | |
"mlock failed: {}", | |
std::io::Error::last_os_error() | |
))); | |
} | |
Ok(()) | |
} | |
} |
🤖 Prompt for AI Agents
In rocketmq-store/src/utils/ffi.rs around lines 33 to 43, the mlock failure path
currently returns a generic "mlock failed" error; update it to capture the OS
error and include it in the RocketmqError message. After the unsafe libc::mlock
call, on non-zero result create a std::io::Error via
std::io::Error::last_os_error() (or format its raw_os_error()/to_string()) and
embed that string into RocketmqError::StoreCustomError so the returned error
surfaces errno/details (e.g., "mlock failed: {os_error}").
#[inline] | ||
pub fn munlock(addr: *const u8, len: usize) -> RocketMQResult<()> { | ||
#[cfg(unix)] | ||
{ | ||
let result = unsafe { libc::munlock(addr as *const c_void, len) }; | ||
if result != 0 { | ||
return Err(RocketmqError::StoreCustomError( | ||
"munlock failed".to_string(), | ||
)); | ||
} | ||
Ok(()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Include errno in munlock error
Same rationale as mlock.
pub fn munlock(addr: *const u8, len: usize) -> RocketMQResult<()> {
- #[cfg(unix)]
- {
- let result = unsafe { libc::munlock(addr as *const c_void, len) };
- if result != 0 {
- return Err(RocketmqError::StoreCustomError(
- "munlock failed".to_string(),
- ));
- }
- Ok(())
- }
+ let result = unsafe { libc::munlock(addr as *const c_void, len) };
+ if result != 0 {
+ return Err(RocketmqError::StoreCustomError(format!(
+ "munlock failed: {}",
+ std::io::Error::last_os_error()
+ )));
+ }
+ Ok(())
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
#[inline] | |
pub fn munlock(addr: *const u8, len: usize) -> RocketMQResult<()> { | |
#[cfg(unix)] | |
{ | |
let result = unsafe { libc::munlock(addr as *const c_void, len) }; | |
if result != 0 { | |
return Err(RocketmqError::StoreCustomError( | |
"munlock failed".to_string(), | |
)); | |
} | |
Ok(()) | |
} | |
} | |
#[inline] | |
pub fn munlock(addr: *const u8, len: usize) -> RocketMQResult<()> { | |
let result = unsafe { libc::munlock(addr as *const c_void, len) }; | |
if result != 0 { | |
return Err(RocketmqError::StoreCustomError(format!( | |
"munlock failed: {}", | |
std::io::Error::last_os_error() | |
))); | |
} | |
Ok(()) | |
} |
🤖 Prompt for AI Agents
In rocketmq-store/src/utils/ffi.rs around lines 45 to 57, the munlock error path
currently returns a generic "munlock failed" without system error details;
change it to capture the OS errno (e.g. via std::io::Error::last_os_error())
after the unsafe libc::munlock call and include that error's display (or its raw
errno) in the RocketmqError::StoreCustomError message so the returned error
contains the errno and human-readable description; keep the existing cfg(unix)
block and return Ok(()) on success.
Which Issue(s) This PR Fixes(Closes)
Fixes #4038
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
Performance
Refactor
Chores