Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
use rocketmq_common::ArcRefCellWrapper;
use rocketmq_common::TimeUtils::get_current_millis;
use rocketmq_common::UtilAll::compute_next_morning_time_millis;
use rocketmq_remoting::protocol::body::broker_body::broker_member_group::BrokerMemberGroup;
use rocketmq_remoting::protocol::body::topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper;
use rocketmq_remoting::protocol::body::topic_info_wrapper::topic_config_wrapper::TopicConfigSerializeWrapper;
use rocketmq_remoting::protocol::namespace_util::NamespaceUtil;
Expand Down Expand Up @@ -114,6 +115,7 @@
#[cfg(feature = "local_file_store")]
pull_request_hold_service: Option<PullRequestHoldService<DefaultMessageStore>>,
rebalance_lock_manager: Arc<RebalanceLockManager>,
broker_member_group: Arc<BrokerMemberGroup>,
}

impl Clone for BrokerRuntime {
Expand Down Expand Up @@ -147,6 +149,7 @@
is_isolated: self.is_isolated.clone(),
pull_request_hold_service: self.pull_request_hold_service.clone(),
rebalance_lock_manager: self.rebalance_lock_manager.clone(),
broker_member_group: self.broker_member_group.clone(),

Check warning on line 152 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L152

Added line #L152 was not covered by tests
}
}
}
Expand Down Expand Up @@ -190,6 +193,14 @@
}));
let broker_stats_manager = Arc::new(stats_manager);
consumer_manager.set_broker_stats_manager(Some(Arc::downgrade(&broker_stats_manager)));
let mut broker_member_group = BrokerMemberGroup::new(
broker_config.broker_identity.broker_cluster_name.clone(),
broker_config.broker_identity.broker_name.clone(),
);
broker_member_group.broker_addrs.insert(
broker_config.broker_identity.broker_id,
broker_config.get_broker_addr(),
);

Check warning on line 203 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L196-L203

Added lines #L196 - L203 were not covered by tests
Self {
broker_config: broker_config.clone(),
message_store_config,
Expand Down Expand Up @@ -222,6 +233,7 @@
is_isolated: Arc::new(AtomicBool::new(false)),
pull_request_hold_service: None,
rebalance_lock_manager: Arc::new(Default::default()),
broker_member_group: Arc::new(broker_member_group),

Check warning on line 236 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L236

Added line #L236 was not covered by tests
}
}

Expand Down Expand Up @@ -471,6 +483,8 @@
self.consumer_manager.clone(),
self.broker_out_api.clone(),
self.broker_stats_manager.clone(),
self.rebalance_lock_manager.clone(),
self.broker_member_group.clone(),

Check warning on line 487 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L486-L487

Added lines #L486 - L487 were not covered by tests
);

BrokerRequestProcessor {
Expand Down
4 changes: 2 additions & 2 deletions rocketmq-broker/src/client/net/broker_to_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;

use crate::error::BrokerError::BrokerClientError;
use crate::BrokerResult;
use crate::Result;

#[derive(Default, Clone)]
pub struct Broker2Client;
Expand All @@ -29,7 +29,7 @@
channel: &mut Channel,
request: RemotingCommand,
timeout_millis: u64,
) -> BrokerResult<RemotingCommand> {
) -> Result<RemotingCommand> {

Check warning on line 32 in rocketmq-broker/src/client/net/broker_to_client.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/client/net/broker_to_client.rs#L32

Added line #L32 was not covered by tests
match channel.send_wait_response(request, timeout_millis).await {
Ok(value) => Ok(value),
Err(e) => Err(BrokerClientError(e)),
Expand Down
80 changes: 46 additions & 34 deletions rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;

Expand All @@ -34,7 +35,7 @@
};
}

type MessageQueueLockTable = HashMap<String, HashMap<Arc<MessageQueue>, LockEntry>>;
type MessageQueueLockTable = HashMap<String, HashMap<MessageQueue, LockEntry>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure MessageQueue implements Hash and Eq correctly for use as HashMap keys

By changing the MessageQueueLockTable to use MessageQueue directly as keys instead of Arc<MessageQueue>, it's essential to ensure that MessageQueue properly implements the Hash and Eq traits. This guarantees correct behavior when storing and retrieving entries from the HashMap, preventing potential issues with key collisions or incorrect lookups.


#[derive(Clone, Default)]
pub struct RebalanceLockManager {
Expand All @@ -60,26 +61,25 @@
pub fn try_lock_batch(
&self,
group: &str,
mqs: Vec<Arc<MessageQueue>>,
mqs: &HashSet<MessageQueue>,
client_id: &str,
) -> Vec<Arc<MessageQueue>> {
let mut lock_mqs = Vec::new();
let mut not_locked_mqs = Vec::new();
) -> HashSet<MessageQueue> {
let mut lock_mqs = HashSet::with_capacity(mqs.len());
let mut not_locked_mqs = HashSet::with_capacity(mqs.len());
Comment on lines +67 to +68
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid unnecessary cloning of MessageQueue when inserting into HashSet

Cloning MessageQueue instances with mq.clone() may introduce performance overhead, especially if MessageQueue is a complex struct. Consider iterating over references and adjusting ownership to minimize or eliminate the need for cloning.

Apply this diff to reduce cloning:

 for mq in mqs.iter() {
     if self.is_locked(group, mq, client_id) {
-        lock_mqs.insert(mq.clone());
+        lock_mqs.insert((*mq).clone());
     } else {
-        not_locked_mqs.insert(mq.clone());
+        not_locked_mqs.insert((*mq).clone());
     }
 }

Committable suggestion was skipped due to low confidence.

for mq in mqs.iter() {
if self.is_locked(group, mq, client_id) {
lock_mqs.push(mq.clone());
lock_mqs.insert(mq.clone());

Check warning on line 71 in rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs#L71

Added line #L71 was not covered by tests
} else {
not_locked_mqs.push(mq.clone());
not_locked_mqs.insert(mq.clone());
}
}
if !not_locked_mqs.is_empty() {
let mut write_guard = self.mq_lock_table.write();
let mut group_value = write_guard.get_mut(group);
if group_value.is_none() {
group_value = Some(write_guard.entry(group.to_string()).or_default());
}
let group_value = group_value.unwrap();
for mq in not_locked_mqs.iter() {
let group_value = write_guard
.entry(group.to_string())
.or_insert(HashMap::with_capacity(32));

for mq in not_locked_mqs {
let lock_entry = group_value.entry(mq.clone()).or_insert_with(|| {
info!(
"RebalanceLockManager#tryLockBatch: lock a message which has not been \
Expand All @@ -96,7 +96,7 @@
get_current_millis() as i64,
std::sync::atomic::Ordering::Relaxed,
);
lock_mqs.push(mq.clone());
lock_mqs.insert(mq);
continue;
}
let old_client_id = lock_entry.client_id.as_str().to_string();
Expand All @@ -106,12 +106,12 @@
get_current_millis() as i64,
std::sync::atomic::Ordering::Relaxed,
);
lock_mqs.push(mq.clone());
warn!(
"RebalanceLockManager#tryLockBatch: try to lock a expired message queue, \
group={} mq={:?}, old client id={}, new client id={}",
group, mq, old_client_id, client_id
);
lock_mqs.insert(mq);

Check warning on line 114 in rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs#L114

Added line #L114 was not covered by tests
continue;
}
warn!(
Expand All @@ -124,7 +124,7 @@
lock_mqs
}

pub fn unlock_batch(&self, group: &str, mqs: Vec<Arc<MessageQueue>>, client_id: &str) {
pub fn unlock_batch(&self, group: &str, mqs: &HashSet<MessageQueue>, client_id: &str) {
let mut write_guard = self.mq_lock_table.write();
let group_value = write_guard.get_mut(group);
if group_value.is_none() {
Expand Down Expand Up @@ -163,7 +163,7 @@
}
}

fn is_locked(&self, group: &str, mq: &Arc<MessageQueue>, client_id: &str) -> bool {
fn is_locked(&self, group: &str, mq: &MessageQueue, client_id: &str) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Confirm thread safety without Arc<MessageQueue>

By removing Arc<MessageQueue> and using MessageQueue directly, confirm that thread safety is maintained, especially if MessageQueue contains internal mutability or is shared across threads. The use of RwLock helps with synchronization, but it's important to ensure no data races occur.

let lock_table = self.mq_lock_table.read();
let group_value = lock_table.get(group);
if group_value.is_none() {
Expand Down Expand Up @@ -231,59 +231,71 @@
#[test]
fn lock_all_expired_returns_false_when_active_locks_exist() {
let manager = RebalanceLockManager::default();
let mq = Arc::new(MessageQueue::default());
manager.try_lock_batch("test_group", vec![mq.clone()], "client_1");
let mq = MessageQueue::default();
let mut set = HashSet::new();
set.insert(mq.clone());
manager.try_lock_batch("test_group", &set, "client_1");
Comment on lines +234 to +237
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace repetitive test setup code with a helper function

The test cases repeatedly create a default MessageQueue and insert it into a HashSet. Consider refactoring this common setup into a helper function to adhere to the DRY (Don't Repeat Yourself) principle, improving maintainability and readability.

Implement a helper function for creating the test HashSet<MessageQueue>:

fn create_test_mq_set() -> HashSet<MessageQueue> {
    let mq = MessageQueue::default();
    let mut set = HashSet::new();
    set.insert(mq);
    set
}

Then update the tests:

 #[test]
 fn lock_all_expired_returns_false_when_active_locks_exist() {
     let manager = RebalanceLockManager::default();
-    let mq = MessageQueue::default();
-    let mut set = HashSet::new();
-    set.insert(mq.clone());
+    let set = create_test_mq_set();
     manager.try_lock_batch("test_group", &set, "client_1");
     assert!(!manager.is_lock_all_expired("test_group"));
 }

Repeat for other test cases.

Also applies to: 244-247, 254-258, 265-270, 277-281, 288-291, 298-298

assert!(!manager.is_lock_all_expired("test_group"));
}

#[test]
fn try_lock_batch_locks_message_queues_for_new_group() {
let manager = RebalanceLockManager::default();
let mq = Arc::new(MessageQueue::default());
let locked_mqs = manager.try_lock_batch("test_group", vec![mq.clone()], "client_1");
let mq = MessageQueue::default();
let mut set = HashSet::new();
set.insert(mq.clone());
let locked_mqs = manager.try_lock_batch("test_group", &set, "client_1");
assert_eq!(locked_mqs.len(), 1);
}

#[test]
fn try_lock_batch_does_not_lock_already_locked_message_queues() {
let manager = RebalanceLockManager::default();
let mq = Arc::new(MessageQueue::default());
manager.try_lock_batch("test_group", vec![mq.clone()], "client_1");
let locked_mqs = manager.try_lock_batch("test_group", vec![mq.clone()], "client_2");
let mq = MessageQueue::default();
let mut set = HashSet::new();
set.insert(mq.clone());
manager.try_lock_batch("test_group", &set, "client_1");
let locked_mqs = manager.try_lock_batch("test_group", &set, "client_2");
assert!(locked_mqs.is_empty());
}

#[test]
fn unlock_batch_unlocks_message_queues_locked_by_client() {
let manager = RebalanceLockManager::default();
let mq = Arc::new(MessageQueue::default());
manager.try_lock_batch("test_group", vec![mq.clone()], "client_1");
manager.unlock_batch("test_group", vec![mq.clone()], "client_1");
let locked_mqs = manager.try_lock_batch("test_group", vec![mq.clone()], "client_2");
let mq = MessageQueue::default();
let mut set = HashSet::new();
set.insert(mq.clone());
manager.try_lock_batch("test_group", &set, "client_1");
manager.unlock_batch("test_group", &set, "client_1");
let locked_mqs = manager.try_lock_batch("test_group", &set, "client_2");
assert_eq!(locked_mqs.len(), 1);
}

#[test]
fn unlock_batch_does_not_unlock_message_queues_locked_by_other_clients() {
let manager = RebalanceLockManager::default();
let mq = Arc::new(MessageQueue::default());
manager.try_lock_batch("test_group", vec![mq.clone()], "client_1");
manager.unlock_batch("test_group", vec![mq.clone()], "client_2");
let mq = MessageQueue::default();
let mut set = HashSet::new();
set.insert(mq.clone());
manager.try_lock_batch("test_group", &set, "client_1");
manager.unlock_batch("test_group", &set, "client_2");
assert!(!manager.is_lock_all_expired("test_group"));
}

#[test]
fn is_locked_returns_true_for_locked_message_queue() {
let manager = RebalanceLockManager::default();
let mq = Arc::new(MessageQueue::default());
manager.try_lock_batch("test_group", vec![mq.clone()], "client_1");
let mq = MessageQueue::default();
let mut set = HashSet::new();
set.insert(mq.clone());
manager.try_lock_batch("test_group", &set, "client_1");
assert!(manager.is_locked("test_group", &mq, "client_1"));
}

#[test]
fn is_locked_returns_false_for_unlocked_message_queue() {
let manager = RebalanceLockManager::default();
let mq = Arc::new(MessageQueue::default());
let mq = MessageQueue::default();
assert!(!manager.is_locked("test_group", &mq, "client_1"));
}
}
3 changes: 3 additions & 0 deletions rocketmq-broker/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@
pub enum BrokerError {
#[error("broker client error: {0}")]
BrokerClientError(#[from] rocketmq_remoting::error::Error),

#[error("Client exception occurred: CODE:{0}, broker address:{2}, Message:{1}")]
MQBrokerError(i32, String, String),

Check warning on line 25 in rocketmq-broker/src/error.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/error.rs#L24-L25

Added lines #L24 - L25 were not covered by tests
}
2 changes: 1 addition & 1 deletion rocketmq-broker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,4 @@ pub(crate) mod topic;
pub(crate) mod util;

type RemotingError = rocketmq_remoting::error::Error;
type BrokerResult<T> = Result<T, BrokerError>;
type Result<T> = std::result::Result<T, BrokerError>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Remaining usages of BrokerResult found.

Please update the following files to replace BrokerResult with the new Result<T> type:

  • rocketmq-remoting/src/protocol/namesrv.rs
  • rocketmq-namesrv/src/route/route_info_manager.rs
  • rocketmq-broker/src/out_api/broker_outer_api.rs
  • rocketmq-client/src/implementation/find_broker_result.rs
  • rocketmq-client/src/factory/mq_client_instance.rs
🔗 Analysis chain

Approve the type alias change and verify its usage.

The change from BrokerResult<T> to Result<T> is a good improvement. It aligns with Rust's naming conventions and simplifies the code while maintaining the same functionality.

To ensure this change doesn't break existing code, please run the following script to verify the usage of both BrokerResult and Result across the codebase:

Please review the script output and update any remaining occurrences of BrokerResult to use the new Result type.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the usage of BrokerResult and Result types

# Test 1: Check for any remaining usage of BrokerResult
echo "Checking for remaining usage of BrokerResult:"
rg --type rust 'BrokerResult'

# Test 2: Verify the usage of the new Result type
echo "Verifying usage of the new Result type:"
rg --type rust 'Result<.*>'

# Note: If Test 1 returns results, those occurrences need to be updated to use the new Result type.
# If Test 2 doesn't return results (other than in lib.rs), it might indicate that the new type isn't being used yet.

Length of output: 48328

42 changes: 42 additions & 0 deletions rocketmq-broker/src/out_api/broker_outer_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::Weak;

use dns_lookup::lookup_host;
use rocketmq_common::common::broker::broker_config::BrokerIdentity;
use rocketmq_common::common::config::TopicConfig;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::utils::crc32_utils;
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
use rocketmq_common::ArcRefCellWrapper;
Expand All @@ -29,14 +31,17 @@
use rocketmq_remoting::code::response_code::ResponseCode;
use rocketmq_remoting::protocol::body::broker_body::register_broker_body::RegisterBrokerBody;
use rocketmq_remoting::protocol::body::kv_table::KVTable;
use rocketmq_remoting::protocol::body::response::lock_batch_response_body::LockBatchResponseBody;
use rocketmq_remoting::protocol::body::topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper;
use rocketmq_remoting::protocol::header::lock_batch_mq_request_header::LockBatchMqRequestHeader;
use rocketmq_remoting::protocol::header::namesrv::register_broker_header::RegisterBrokerRequestHeader;
use rocketmq_remoting::protocol::header::namesrv::register_broker_header::RegisterBrokerResponseHeader;
use rocketmq_remoting::protocol::header::namesrv::topic_operation_header::RegisterTopicRequestHeader;
use rocketmq_remoting::protocol::namesrv::RegisterBrokerResult;
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
use rocketmq_remoting::protocol::route::route_data_view::QueueData;
use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData;
use rocketmq_remoting::protocol::RemotingDeserializable;
use rocketmq_remoting::protocol::RemotingSerializable;
use rocketmq_remoting::remoting::RemotingService;
use rocketmq_remoting::request_processor::default_request_processor::DefaultRemotingRequestProcessor;
Expand All @@ -48,6 +53,10 @@
use tracing::error;
use tracing::info;

use crate::error::BrokerError;
use crate::error::BrokerError::BrokerClientError;
use crate::Result;

pub struct BrokerOuterAPI {
remoting_client: ArcRefCellWrapper<RocketmqDefaultClient<DefaultRemotingRequestProcessor>>,
name_server_address: Option<String>,
Expand Down Expand Up @@ -317,6 +326,39 @@
pub fn rpc_client(&self) -> &RpcClientImpl {
&self.rpc_client
}

pub async fn lock_batch_mq_async(
&self,
addr: String,
request_body: bytes::Bytes,
timeout_millis: u64,
) -> Result<HashSet<MessageQueue>> {
let mut request = RemotingCommand::create_request_command(
RequestCode::LockBatchMq,
LockBatchMqRequestHeader::default(),
);
request.set_body_mut_ref(Some(request_body));
let result = self

Check warning on line 341 in rocketmq-broker/src/out_api/broker_outer_api.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/out_api/broker_outer_api.rs#L330-L341

Added lines #L330 - L341 were not covered by tests
.remoting_client
.invoke_async(Some(addr), request, timeout_millis)
.await;
match result {
Ok(response) => {
if ResponseCode::from(response.code()) == ResponseCode::Success {

Check warning on line 347 in rocketmq-broker/src/out_api/broker_outer_api.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/out_api/broker_outer_api.rs#L343-L347

Added lines #L343 - L347 were not covered by tests
let lock_batch_response_body =
LockBatchResponseBody::decode(response.get_body().unwrap()).unwrap();
Ok(lock_batch_response_body.lock_ok_mq_set)

Check warning on line 350 in rocketmq-broker/src/out_api/broker_outer_api.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/out_api/broker_outer_api.rs#L349-L350

Added lines #L349 - L350 were not covered by tests
Comment on lines +349 to +350
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid using unwrap() to handle potential None values

Using unwrap() on response.get_body() can lead to a panic if the body is None. Instead, handle the None case gracefully to prevent unexpected crashes.

Apply this diff to handle the Option safely and propagate errors appropriately:

- let lock_batch_response_body =
-     LockBatchResponseBody::decode(response.get_body().unwrap()).unwrap();
+ let body = response.get_body().ok_or_else(|| {
+     BrokerError::MQBrokerError(
+         response.code(),
+         "Response body is empty".to_string(),
+         "".to_string(),
+     )
+ })?;
+ let lock_batch_response_body = LockBatchResponseBody::decode(body)?;

This modification ensures that if the response body is None, an appropriate error is returned. Additionally, using the ? operator on the decode method propagates any decoding errors without panicking.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
LockBatchResponseBody::decode(response.get_body().unwrap()).unwrap();
Ok(lock_batch_response_body.lock_ok_mq_set)
let body = response.get_body().ok_or_else(|| {
BrokerError::MQBrokerError(
response.code(),
"Response body is empty".to_string(),
"".to_string(),
)
})?;
let lock_batch_response_body = LockBatchResponseBody::decode(body)?;
Ok(lock_batch_response_body.lock_ok_mq_set)

} else {
Err(BrokerError::MQBrokerError(
response.code(),
response.remark().cloned().unwrap_or("".to_string()),
"".to_string(),
))

Check warning on line 356 in rocketmq-broker/src/out_api/broker_outer_api.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/out_api/broker_outer_api.rs#L352-L356

Added lines #L352 - L356 were not covered by tests
}
}
Comment on lines +336 to +358
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve logging and error messages for better debugging

When the response code is not ResponseCode::Success, the error returned lacks detailed context. Enhancing the error message with additional information can aid in debugging and provide clearer insights into failures.

Consider including the address and any relevant request details in the error:

Err(BrokerError::MQBrokerError(
    response.code(),
    response.remark().cloned().unwrap_or_else(|| "Unknown error".to_string()),
    "".to_string(),
))
+ .context(format!(
+     "Failed to lock batch MQ at addr: {}, request: {:?}",
+     addr, request
+ ))

This addition provides more context about where and why the failure occurred.

Committable suggestion was skipped due to low confidence.

Err(e) => Err(BrokerClientError(e)),

Check warning on line 359 in rocketmq-broker/src/out_api/broker_outer_api.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/out_api/broker_outer_api.rs#L358-L359

Added lines #L358 - L359 were not covered by tests
}
}

Check warning on line 361 in rocketmq-broker/src/out_api/broker_outer_api.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/out_api/broker_outer_api.rs#L361

Added line #L361 was not covered by tests
}

fn dns_lookup_address_by_domain(domain: &str) -> Vec<String> {
Expand Down
Loading
Loading