Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rocketmq-namesrv/src/bin/namesrv_bootstrap_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async fn main() -> Result<()> {
})
.build()
.boot()
.await;
.await?;

Ok(())
}
Expand Down
15 changes: 10 additions & 5 deletions rocketmq-namesrv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use cheetah_string::CheetahString;
use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig;
use rocketmq_common::common::server::config::ServerConfig;
use rocketmq_common::utils::network_util::NetworkUtil;
use rocketmq_error::RocketMQResult;
use rocketmq_remoting::base::channel_event_listener::ChannelEventListener;
use rocketmq_remoting::clients::rocketmq_default_impl::RocketmqDefaultClient;
use rocketmq_remoting::clients::RemotingClient;
Expand Down Expand Up @@ -62,13 +63,15 @@ struct NameServerRuntime {
}

impl NameServerBootstrap {
pub async fn boot(mut self) {
pub async fn boot(mut self) -> RocketMQResult<()> {
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
self.name_server_runtime.shutdown_rx = Some(shutdown_rx);
self.name_server_runtime.initialize().await?;
tokio::join!(
self.name_server_runtime.start(),
wait_for_signal_inner(shutdown_tx)
);
Ok(())
}
}

Expand All @@ -84,19 +87,21 @@ async fn wait_for_signal_inner(shutdown_tx: broadcast::Sender<()>) {
}

impl NameServerRuntime {
pub async fn initialize(&mut self) {
self.load_config().await;
pub async fn initialize(&mut self) -> RocketMQResult<()> {
self.load_config().await?;
self.initiate_network_components();
self.register_processor();
self.start_schedule_service();
self.initiate_ssl_context();
self.initiate_rpc_hooks();
Ok(())
}

async fn load_config(&mut self) {
async fn load_config(&mut self) -> RocketMQResult<()> {
if let Some(kv_config_manager) = self.inner.kvconfig_manager.as_mut() {
kv_config_manager.load();
kv_config_manager.load()?;
}
Ok(())
}
fn initiate_network_components(&mut self) {
//nothing to do
Expand Down
9 changes: 5 additions & 4 deletions rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ use std::sync::Arc;

use cheetah_string::CheetahString;
use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig;
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
use rocketmq_common::FileUtils;
use rocketmq_error::RocketMQResult;
use rocketmq_remoting::protocol::body::kv_table::KVTable;
use rocketmq_remoting::protocol::RemotingDeserializable;
use rocketmq_remoting::protocol::RemotingSerializable;
use rocketmq_rust::ArcMut;
use tracing::error;
Expand Down Expand Up @@ -82,23 +83,23 @@ impl KVConfigManager {

impl KVConfigManager {
/// Loads key-value configurations from a file.
pub fn load(&mut self) {
pub fn load(&mut self) -> RocketMQResult<()> {
let result = FileUtils::file_to_string(
self.name_server_runtime_inner
.name_server_config()
.kv_config_path
.as_str(),
);
if let Ok(content) = result {
let wrapper =
SerdeJsonUtils::decode::<KVConfigSerializeWrapper>(content.as_bytes()).unwrap();
let wrapper = KVConfigSerializeWrapper::decode(content.as_bytes())?;
if let Some(config_table) = wrapper.config_table {
for (key, value) in config_table {
self.config_table.insert(key, value);
}
info!("load KV config success");
Comment on lines +94 to 99
Copy link

Copilot AI Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change from SerdeJsonUtils::decode() to KVConfigSerializeWrapper::decode() may introduce a breaking change if the deserialization behavior differs. Ensure that KVConfigSerializeWrapper::decode() provides the same error handling and deserialization logic as the previous implementation.

Suggested change
let wrapper = KVConfigSerializeWrapper::decode(content.as_bytes())?;
if let Some(config_table) = wrapper.config_table {
for (key, value) in config_table {
self.config_table.insert(key, value);
}
info!("load KV config success");
match KVConfigSerializeWrapper::decode(content.as_bytes()) {
Ok(wrapper) => {
if let Some(config_table) = wrapper.config_table {
for (key, value) in config_table {
self.config_table.insert(key, value);
}
info!("load KV config success");
}
}
Err(e) => {
error!("Failed to deserialize KV config: {:?}", e);
return Err(e.into());
}

Copilot uses AI. Check for mistakes.

}
}
Ok(())
}
Comment on lines +86 to 103
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Propagate file I/O errors and skip decoding empty content to avoid startup failure

FileUtils::file_to_string returns Ok("") when the file doesn't exist; decoding an empty string will now error and bubble up, breaking boot. Also, actual I/O errors are currently swallowed. Propagate I/O errors, and early-return on empty content.

-    pub fn load(&mut self) -> RocketMQResult<()> {
-        let result = FileUtils::file_to_string(
-            self.name_server_runtime_inner
-                .name_server_config()
-                .kv_config_path
-                .as_str(),
-        );
-        if let Ok(content) = result {
-            let wrapper = KVConfigSerializeWrapper::decode(content.as_bytes())?;
-            if let Some(config_table) = wrapper.config_table {
-                for (key, value) in config_table {
-                    self.config_table.insert(key, value);
-                }
-                info!("load KV config success");
-            }
-        }
-        Ok(())
-    }
+    pub fn load(&mut self) -> RocketMQResult<()> {
+        let path = self
+            .name_server_runtime_inner
+            .name_server_config()
+            .kv_config_path
+            .as_str();
+        let content = FileUtils::file_to_string(path)?;
+        if content.trim().is_empty() {
+            // Missing or empty file is not an error; nothing to load.
+            return Ok(());
+        }
+        let wrapper = KVConfigSerializeWrapper::decode(content.as_bytes())?;
+        if let Some(config_table) = wrapper.config_table {
+            for (key, value) in config_table {
+                self.config_table.insert(key, value);
+            }
+            info!("load KV config success");
+        }
+        Ok(())
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn load(&mut self) -> RocketMQResult<()> {
let result = FileUtils::file_to_string(
self.name_server_runtime_inner
.name_server_config()
.kv_config_path
.as_str(),
);
if let Ok(content) = result {
let wrapper =
SerdeJsonUtils::decode::<KVConfigSerializeWrapper>(content.as_bytes()).unwrap();
let wrapper = KVConfigSerializeWrapper::decode(content.as_bytes())?;
if let Some(config_table) = wrapper.config_table {
for (key, value) in config_table {
self.config_table.insert(key, value);
}
info!("load KV config success");
}
}
Ok(())
}
pub fn load(&mut self) -> RocketMQResult<()> {
let path = self
.name_server_runtime_inner
.name_server_config()
.kv_config_path
.as_str();
let content = FileUtils::file_to_string(path)?;
if content.trim().is_empty() {
// Missing or empty file is not an error; nothing to load.
return Ok(());
}
let wrapper = KVConfigSerializeWrapper::decode(content.as_bytes())?;
if let Some(config_table) = wrapper.config_table {
for (key, value) in config_table {
self.config_table.insert(key, value);
}
info!("load KV config success");
}
Ok(())
}
🤖 Prompt for AI Agents
In rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs around lines 86 to 103,
the current implementation swallows file I/O errors and attempts to decode an
empty string (which causes a startup failure); change the code to propagate I/O
errors by using the ? operator on FileUtils::file_to_string so real read errors
return early, then check if the returned content is empty and return Ok(()) (or
log and skip) before attempting to decode, and only call
KVConfigSerializeWrapper::decode when content is non-empty so decoding empty
content is avoided.


/// Updates the Namesrv configuration.
Expand Down
Loading