-
Notifications
You must be signed in to change notification settings - Fork 173
[ISSUE #4014]♻️Refactor: Update async functions to return RocketMQResult for better error handling #4015
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ult for better error handling
WalkthroughRefactors nameserver startup to propagate errors using RocketMQResult. main awaits boot and returns errors. NameServerBootstrap::boot and NameServerRuntime::initialize now return RocketMQResult and use ? to propagate failures. KVConfigManager::load returns RocketMQResult, switches to decode via RemotingDeserializable, and propagates decode errors. Changes
Sequence Diagram(s)sequenceDiagram
actor CLI as Process
participant Main as namesrv_bootstrap_server::main
participant Boot as NameServerBootstrap::boot()
participant Runtime as NameServerRuntime::initialize()
participant KV as KVConfigManager::load()
CLI->>Main: start
Main->>Boot: await boot()
Boot->>Runtime: await initialize()
Runtime->>KV: load()
alt decode/read OK
KV-->>Runtime: Ok(())
Runtime-->>Boot: Ok(())
Boot-->>Main: Ok(())
Main-->>CLI: Ok(()) exit 0
else error
KV-->>Runtime: Err(e)
Runtime-->>Boot: Err(e)
Boot-->>Main: Err(e)
Main-->>CLI: Err(e) exit non-zero
end
note over Boot,Runtime: Errors propagate via RocketMQResult and ?
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Pre-merge checks (4 passed, 1 inconclusive)❌ Failed checks (1 inconclusive)
✅ Passed checks (4 passed)
Poem
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal). Please share your feedback with us on this Discord post. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors async functions in the RocketMQ Namesrv component to return RocketMQResult<()>
for better error handling instead of returning ()
and using unwrap()
calls.
- Replaces
unwrap()
calls with proper error propagation using the?
operator - Updates function signatures to return
RocketMQResult<()>
for consistent error handling - Removes dependency on
SerdeJsonUtils
in favor of direct deserialization methods
Reviewed Changes
Copilot reviewed 2 out of 3 changed files in this pull request and generated 1 comment.
File | Description |
---|---|
rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs | Updates load() method to return RocketMQResult<()> and replaces unwrap() with proper error handling |
rocketmq-namesrv/src/bootstrap.rs | Updates bootstrap methods to return RocketMQResult<()> and propagate errors using ? operator |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
let wrapper = KVConfigSerializeWrapper::decode(content.as_bytes())?; | ||
if let Some(config_table) = wrapper.config_table { | ||
for (key, value) in config_table { | ||
self.config_table.insert(key, value); | ||
} | ||
info!("load KV config success"); |
Copilot
AI
Sep 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change from SerdeJsonUtils::decode()
to KVConfigSerializeWrapper::decode()
may introduce a breaking change if the deserialization behavior differs. Ensure that KVConfigSerializeWrapper::decode()
provides the same error handling and deserialization logic as the previous implementation.
let wrapper = KVConfigSerializeWrapper::decode(content.as_bytes())?; | |
if let Some(config_table) = wrapper.config_table { | |
for (key, value) in config_table { | |
self.config_table.insert(key, value); | |
} | |
info!("load KV config success"); | |
match KVConfigSerializeWrapper::decode(content.as_bytes()) { | |
Ok(wrapper) => { | |
if let Some(config_table) = wrapper.config_table { | |
for (key, value) in config_table { | |
self.config_table.insert(key, value); | |
} | |
info!("load KV config success"); | |
} | |
} | |
Err(e) => { | |
error!("Failed to deserialize KV config: {:?}", e); | |
return Err(e.into()); | |
} |
Copilot uses AI. Check for mistakes.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #4015 +/- ##
==========================================
- Coverage 26.58% 26.58% -0.01%
==========================================
Files 565 565
Lines 80699 80703 +4
==========================================
Hits 21453 21453
- Misses 59246 59250 +4 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs (1)
117-121
: Avoid panic on serialization during persist()
serde_json::to_string(...).unwrap()
can panic; handle the error and log instead to keep the process stable.- let content = serde_json::to_string(&wrapper).unwrap(); + let content = match serde_json::to_string(&wrapper) { + Ok(s) => s, + Err(e) => { + error!("serialize KV config failed: {}", e); + return; + } + };rocketmq-namesrv/src/bootstrap.rs (1)
90-98
: Make load_config synchronous (no awaits) to simplify the init path
load_config
performs no async work; make it sync and drop the unnecessary.await
ininitialize
for clarity and less boilerplate.- pub async fn initialize(&mut self) -> RocketMQResult<()> { - self.load_config().await?; + pub async fn initialize(&mut self) -> RocketMQResult<()> { + self.load_config()?; self.initiate_network_components(); self.register_processor(); self.start_schedule_service(); self.initiate_ssl_context(); self.initiate_rpc_hooks(); Ok(()) } - async fn load_config(&mut self) -> RocketMQResult<()> { + fn load_config(&mut self) -> RocketMQResult<()> { if let Some(kv_config_manager) = self.inner.kvconfig_manager.as_mut() { kv_config_manager.load()?; } Ok(()) }Also applies to: 100-105
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
rocketmq-namesrv/src/bin/namesrv_bootstrap_server.rs
(1 hunks)rocketmq-namesrv/src/bootstrap.rs
(3 hunks)rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs
(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs (3)
rocketmq-common/src/utils/file_utils.rs (1)
file_to_string
(30-37)rocketmq-remoting/src/protocol.rs (2)
decode
(464-464)decode
(485-487)rocketmq-common/src/utils/serde_json_utils.rs (1)
decode
(21-26)
rocketmq-namesrv/src/bootstrap.rs (3)
rocketmq-broker/src/broker_bootstrap.rs (4)
boot
(32-41)start
(47-49)wait_for_signal_inner
(52-60)initialize
(43-45)rocketmq-remoting/src/clients/client.rs (1)
tokio
(251-251)rocketmq-remoting/src/net/channel.rs (1)
tokio
(221-221)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: build
- GitHub Check: test
- GitHub Check: build (ubuntu-latest, nightly)
- GitHub Check: build (windows-latest, nightly)
- GitHub Check: build (windows-latest, stable)
- GitHub Check: build (macos-latest, nightly)
- GitHub Check: build (macos-latest, stable)
- GitHub Check: build (ubuntu-latest, stable)
- GitHub Check: auto-approve
🔇 Additional comments (3)
rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs (1)
23-26
: Imports align with the new result-based flowAdding
RocketMQResult
andRemotingDeserializable
is correct and consistent with the PR objective.rocketmq-namesrv/src/bin/namesrv_bootstrap_server.rs (1)
62-64
: Good: propagate boot errors to mainUsing
.await?
ensures startup failures surface to the process exit code.rocketmq-namesrv/src/bootstrap.rs (1)
66-75
: Boot now returns RocketMQResult — OKClear propagation of initialization errors, then graceful shutdown via join; returning
Ok(())
is appropriate.
pub fn load(&mut self) -> RocketMQResult<()> { | ||
let result = FileUtils::file_to_string( | ||
self.name_server_runtime_inner | ||
.name_server_config() | ||
.kv_config_path | ||
.as_str(), | ||
); | ||
if let Ok(content) = result { | ||
let wrapper = | ||
SerdeJsonUtils::decode::<KVConfigSerializeWrapper>(content.as_bytes()).unwrap(); | ||
let wrapper = KVConfigSerializeWrapper::decode(content.as_bytes())?; | ||
if let Some(config_table) = wrapper.config_table { | ||
for (key, value) in config_table { | ||
self.config_table.insert(key, value); | ||
} | ||
info!("load KV config success"); | ||
} | ||
} | ||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Propagate file I/O errors and skip decoding empty content to avoid startup failure
FileUtils::file_to_string
returns Ok("") when the file doesn't exist; decoding an empty string will now error and bubble up, breaking boot. Also, actual I/O errors are currently swallowed. Propagate I/O errors, and early-return on empty content.
- pub fn load(&mut self) -> RocketMQResult<()> {
- let result = FileUtils::file_to_string(
- self.name_server_runtime_inner
- .name_server_config()
- .kv_config_path
- .as_str(),
- );
- if let Ok(content) = result {
- let wrapper = KVConfigSerializeWrapper::decode(content.as_bytes())?;
- if let Some(config_table) = wrapper.config_table {
- for (key, value) in config_table {
- self.config_table.insert(key, value);
- }
- info!("load KV config success");
- }
- }
- Ok(())
- }
+ pub fn load(&mut self) -> RocketMQResult<()> {
+ let path = self
+ .name_server_runtime_inner
+ .name_server_config()
+ .kv_config_path
+ .as_str();
+ let content = FileUtils::file_to_string(path)?;
+ if content.trim().is_empty() {
+ // Missing or empty file is not an error; nothing to load.
+ return Ok(());
+ }
+ let wrapper = KVConfigSerializeWrapper::decode(content.as_bytes())?;
+ if let Some(config_table) = wrapper.config_table {
+ for (key, value) in config_table {
+ self.config_table.insert(key, value);
+ }
+ info!("load KV config success");
+ }
+ Ok(())
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
pub fn load(&mut self) -> RocketMQResult<()> { | |
let result = FileUtils::file_to_string( | |
self.name_server_runtime_inner | |
.name_server_config() | |
.kv_config_path | |
.as_str(), | |
); | |
if let Ok(content) = result { | |
let wrapper = | |
SerdeJsonUtils::decode::<KVConfigSerializeWrapper>(content.as_bytes()).unwrap(); | |
let wrapper = KVConfigSerializeWrapper::decode(content.as_bytes())?; | |
if let Some(config_table) = wrapper.config_table { | |
for (key, value) in config_table { | |
self.config_table.insert(key, value); | |
} | |
info!("load KV config success"); | |
} | |
} | |
Ok(()) | |
} | |
pub fn load(&mut self) -> RocketMQResult<()> { | |
let path = self | |
.name_server_runtime_inner | |
.name_server_config() | |
.kv_config_path | |
.as_str(); | |
let content = FileUtils::file_to_string(path)?; | |
if content.trim().is_empty() { | |
// Missing or empty file is not an error; nothing to load. | |
return Ok(()); | |
} | |
let wrapper = KVConfigSerializeWrapper::decode(content.as_bytes())?; | |
if let Some(config_table) = wrapper.config_table { | |
for (key, value) in config_table { | |
self.config_table.insert(key, value); | |
} | |
info!("load KV config success"); | |
} | |
Ok(()) | |
} |
🤖 Prompt for AI Agents
In rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs around lines 86 to 103,
the current implementation swallows file I/O errors and attempts to decode an
empty string (which causes a startup failure); change the code to propagate I/O
errors by using the ? operator on FileUtils::file_to_string so real read errors
return early, then check if the returned content is empty and return Ok(()) (or
log and skip) before attempting to decode, and only call
KVConfigSerializeWrapper::decode when content is non-empty so decoding empty
content is avoided.
Which Issue(s) This PR Fixes(Closes)
Fixes #4014
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
Bug Fixes
Refactor