-
Notifications
You must be signed in to change notification settings - Fork 179
[ISSUE ##1052] Support request code LOCK_BATCH_MQ(41) #1061
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,7 @@ | |
| * limitations under the License. | ||
| */ | ||
| use std::collections::HashMap; | ||
| use std::collections::HashSet; | ||
| use std::sync::atomic::AtomicI64; | ||
| use std::sync::Arc; | ||
|
|
||
|
|
@@ -34,7 +35,7 @@ | |
| }; | ||
| } | ||
|
|
||
| type MessageQueueLockTable = HashMap<String, HashMap<Arc<MessageQueue>, LockEntry>>; | ||
| type MessageQueueLockTable = HashMap<String, HashMap<MessageQueue, LockEntry>>; | ||
|
|
||
| #[derive(Clone, Default)] | ||
| pub struct RebalanceLockManager { | ||
|
|
@@ -60,26 +61,25 @@ | |
| pub fn try_lock_batch( | ||
| &self, | ||
| group: &str, | ||
| mqs: Vec<Arc<MessageQueue>>, | ||
| mqs: &HashSet<MessageQueue>, | ||
| client_id: &str, | ||
| ) -> Vec<Arc<MessageQueue>> { | ||
| let mut lock_mqs = Vec::new(); | ||
| let mut not_locked_mqs = Vec::new(); | ||
| ) -> HashSet<MessageQueue> { | ||
| let mut lock_mqs = HashSet::with_capacity(mqs.len()); | ||
| let mut not_locked_mqs = HashSet::with_capacity(mqs.len()); | ||
|
Comment on lines
+67
to
+68
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Avoid unnecessary cloning of Cloning Apply this diff to reduce cloning: for mq in mqs.iter() {
if self.is_locked(group, mq, client_id) {
- lock_mqs.insert(mq.clone());
+ lock_mqs.insert((*mq).clone());
} else {
- not_locked_mqs.insert(mq.clone());
+ not_locked_mqs.insert((*mq).clone());
}
}
|
||
| for mq in mqs.iter() { | ||
| if self.is_locked(group, mq, client_id) { | ||
| lock_mqs.push(mq.clone()); | ||
| lock_mqs.insert(mq.clone()); | ||
| } else { | ||
| not_locked_mqs.push(mq.clone()); | ||
| not_locked_mqs.insert(mq.clone()); | ||
| } | ||
| } | ||
| if !not_locked_mqs.is_empty() { | ||
| let mut write_guard = self.mq_lock_table.write(); | ||
| let mut group_value = write_guard.get_mut(group); | ||
| if group_value.is_none() { | ||
| group_value = Some(write_guard.entry(group.to_string()).or_default()); | ||
| } | ||
| let group_value = group_value.unwrap(); | ||
| for mq in not_locked_mqs.iter() { | ||
| let group_value = write_guard | ||
| .entry(group.to_string()) | ||
| .or_insert(HashMap::with_capacity(32)); | ||
|
|
||
| for mq in not_locked_mqs { | ||
| let lock_entry = group_value.entry(mq.clone()).or_insert_with(|| { | ||
| info!( | ||
| "RebalanceLockManager#tryLockBatch: lock a message which has not been \ | ||
|
|
@@ -96,7 +96,7 @@ | |
| get_current_millis() as i64, | ||
| std::sync::atomic::Ordering::Relaxed, | ||
| ); | ||
| lock_mqs.push(mq.clone()); | ||
| lock_mqs.insert(mq); | ||
| continue; | ||
| } | ||
| let old_client_id = lock_entry.client_id.as_str().to_string(); | ||
|
|
@@ -106,12 +106,12 @@ | |
| get_current_millis() as i64, | ||
| std::sync::atomic::Ordering::Relaxed, | ||
| ); | ||
| lock_mqs.push(mq.clone()); | ||
| warn!( | ||
| "RebalanceLockManager#tryLockBatch: try to lock a expired message queue, \ | ||
| group={} mq={:?}, old client id={}, new client id={}", | ||
| group, mq, old_client_id, client_id | ||
| ); | ||
| lock_mqs.insert(mq); | ||
| continue; | ||
| } | ||
| warn!( | ||
|
|
@@ -124,7 +124,7 @@ | |
| lock_mqs | ||
| } | ||
|
|
||
| pub fn unlock_batch(&self, group: &str, mqs: Vec<Arc<MessageQueue>>, client_id: &str) { | ||
| pub fn unlock_batch(&self, group: &str, mqs: &HashSet<MessageQueue>, client_id: &str) { | ||
| let mut write_guard = self.mq_lock_table.write(); | ||
| let group_value = write_guard.get_mut(group); | ||
| if group_value.is_none() { | ||
|
|
@@ -163,7 +163,7 @@ | |
| } | ||
| } | ||
|
|
||
| fn is_locked(&self, group: &str, mq: &Arc<MessageQueue>, client_id: &str) -> bool { | ||
| fn is_locked(&self, group: &str, mq: &MessageQueue, client_id: &str) -> bool { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Confirm thread safety without By removing |
||
| let lock_table = self.mq_lock_table.read(); | ||
| let group_value = lock_table.get(group); | ||
| if group_value.is_none() { | ||
|
|
@@ -231,59 +231,71 @@ | |
| #[test] | ||
| fn lock_all_expired_returns_false_when_active_locks_exist() { | ||
| let manager = RebalanceLockManager::default(); | ||
| let mq = Arc::new(MessageQueue::default()); | ||
| manager.try_lock_batch("test_group", vec![mq.clone()], "client_1"); | ||
| let mq = MessageQueue::default(); | ||
| let mut set = HashSet::new(); | ||
| set.insert(mq.clone()); | ||
| manager.try_lock_batch("test_group", &set, "client_1"); | ||
|
Comment on lines
+234
to
+237
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Replace repetitive test setup code with a helper function The test cases repeatedly create a default Implement a helper function for creating the test fn create_test_mq_set() -> HashSet<MessageQueue> {
let mq = MessageQueue::default();
let mut set = HashSet::new();
set.insert(mq);
set
}Then update the tests: #[test]
fn lock_all_expired_returns_false_when_active_locks_exist() {
let manager = RebalanceLockManager::default();
- let mq = MessageQueue::default();
- let mut set = HashSet::new();
- set.insert(mq.clone());
+ let set = create_test_mq_set();
manager.try_lock_batch("test_group", &set, "client_1");
assert!(!manager.is_lock_all_expired("test_group"));
}Repeat for other test cases. Also applies to: 244-247, 254-258, 265-270, 277-281, 288-291, 298-298 |
||
| assert!(!manager.is_lock_all_expired("test_group")); | ||
| } | ||
|
|
||
| #[test] | ||
| fn try_lock_batch_locks_message_queues_for_new_group() { | ||
| let manager = RebalanceLockManager::default(); | ||
| let mq = Arc::new(MessageQueue::default()); | ||
| let locked_mqs = manager.try_lock_batch("test_group", vec![mq.clone()], "client_1"); | ||
| let mq = MessageQueue::default(); | ||
| let mut set = HashSet::new(); | ||
| set.insert(mq.clone()); | ||
| let locked_mqs = manager.try_lock_batch("test_group", &set, "client_1"); | ||
| assert_eq!(locked_mqs.len(), 1); | ||
| } | ||
|
|
||
| #[test] | ||
| fn try_lock_batch_does_not_lock_already_locked_message_queues() { | ||
| let manager = RebalanceLockManager::default(); | ||
| let mq = Arc::new(MessageQueue::default()); | ||
| manager.try_lock_batch("test_group", vec![mq.clone()], "client_1"); | ||
| let locked_mqs = manager.try_lock_batch("test_group", vec![mq.clone()], "client_2"); | ||
| let mq = MessageQueue::default(); | ||
| let mut set = HashSet::new(); | ||
| set.insert(mq.clone()); | ||
| manager.try_lock_batch("test_group", &set, "client_1"); | ||
| let locked_mqs = manager.try_lock_batch("test_group", &set, "client_2"); | ||
| assert!(locked_mqs.is_empty()); | ||
| } | ||
|
|
||
| #[test] | ||
| fn unlock_batch_unlocks_message_queues_locked_by_client() { | ||
| let manager = RebalanceLockManager::default(); | ||
| let mq = Arc::new(MessageQueue::default()); | ||
| manager.try_lock_batch("test_group", vec![mq.clone()], "client_1"); | ||
| manager.unlock_batch("test_group", vec![mq.clone()], "client_1"); | ||
| let locked_mqs = manager.try_lock_batch("test_group", vec![mq.clone()], "client_2"); | ||
| let mq = MessageQueue::default(); | ||
| let mut set = HashSet::new(); | ||
| set.insert(mq.clone()); | ||
| manager.try_lock_batch("test_group", &set, "client_1"); | ||
| manager.unlock_batch("test_group", &set, "client_1"); | ||
| let locked_mqs = manager.try_lock_batch("test_group", &set, "client_2"); | ||
| assert_eq!(locked_mqs.len(), 1); | ||
| } | ||
|
|
||
| #[test] | ||
| fn unlock_batch_does_not_unlock_message_queues_locked_by_other_clients() { | ||
| let manager = RebalanceLockManager::default(); | ||
| let mq = Arc::new(MessageQueue::default()); | ||
| manager.try_lock_batch("test_group", vec![mq.clone()], "client_1"); | ||
| manager.unlock_batch("test_group", vec![mq.clone()], "client_2"); | ||
| let mq = MessageQueue::default(); | ||
| let mut set = HashSet::new(); | ||
| set.insert(mq.clone()); | ||
| manager.try_lock_batch("test_group", &set, "client_1"); | ||
| manager.unlock_batch("test_group", &set, "client_2"); | ||
| assert!(!manager.is_lock_all_expired("test_group")); | ||
| } | ||
|
|
||
| #[test] | ||
| fn is_locked_returns_true_for_locked_message_queue() { | ||
| let manager = RebalanceLockManager::default(); | ||
| let mq = Arc::new(MessageQueue::default()); | ||
| manager.try_lock_batch("test_group", vec![mq.clone()], "client_1"); | ||
| let mq = MessageQueue::default(); | ||
| let mut set = HashSet::new(); | ||
| set.insert(mq.clone()); | ||
| manager.try_lock_batch("test_group", &set, "client_1"); | ||
| assert!(manager.is_locked("test_group", &mq, "client_1")); | ||
| } | ||
|
|
||
| #[test] | ||
| fn is_locked_returns_false_for_unlocked_message_queue() { | ||
| let manager = RebalanceLockManager::default(); | ||
| let mq = Arc::new(MessageQueue::default()); | ||
| let mq = MessageQueue::default(); | ||
| assert!(!manager.is_locked("test_group", &mq, "client_1")); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -47,4 +47,4 @@ pub(crate) mod topic; | |
| pub(crate) mod util; | ||
|
|
||
| type RemotingError = rocketmq_remoting::error::Error; | ||
| type BrokerResult<T> = Result<T, BrokerError>; | ||
| type Result<T> = std::result::Result<T, BrokerError>; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Codebase verification Remaining usages of Please update the following files to replace
🔗 Analysis chainApprove the type alias change and verify its usage. The change from To ensure this change doesn't break existing code, please run the following script to verify the usage of both Please review the script output and update any remaining occurrences of 🏁 Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Description: Verify the usage of BrokerResult and Result types
# Test 1: Check for any remaining usage of BrokerResult
echo "Checking for remaining usage of BrokerResult:"
rg --type rust 'BrokerResult'
# Test 2: Verify the usage of the new Result type
echo "Verifying usage of the new Result type:"
rg --type rust 'Result<.*>'
# Note: If Test 1 returns results, those occurrences need to be updated to use the new Result type.
# If Test 2 doesn't return results (other than in lib.rs), it might indicate that the new type isn't being used yet.
Length of output: 48328 |
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -14,12 +14,14 @@ | |||||||||||||||||||||||
| * See the License for the specific language governing permissions and | ||||||||||||||||||||||||
| * limitations under the License. | ||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||
| use std::collections::HashSet; | ||||||||||||||||||||||||
| use std::sync::Arc; | ||||||||||||||||||||||||
| use std::sync::Weak; | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| use dns_lookup::lookup_host; | ||||||||||||||||||||||||
| use rocketmq_common::common::broker::broker_config::BrokerIdentity; | ||||||||||||||||||||||||
| use rocketmq_common::common::config::TopicConfig; | ||||||||||||||||||||||||
| use rocketmq_common::common::message::message_queue::MessageQueue; | ||||||||||||||||||||||||
| use rocketmq_common::utils::crc32_utils; | ||||||||||||||||||||||||
| use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils; | ||||||||||||||||||||||||
| use rocketmq_common::ArcRefCellWrapper; | ||||||||||||||||||||||||
|
|
@@ -29,14 +31,17 @@ | |||||||||||||||||||||||
| use rocketmq_remoting::code::response_code::ResponseCode; | ||||||||||||||||||||||||
| use rocketmq_remoting::protocol::body::broker_body::register_broker_body::RegisterBrokerBody; | ||||||||||||||||||||||||
| use rocketmq_remoting::protocol::body::kv_table::KVTable; | ||||||||||||||||||||||||
| use rocketmq_remoting::protocol::body::response::lock_batch_response_body::LockBatchResponseBody; | ||||||||||||||||||||||||
| use rocketmq_remoting::protocol::body::topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper; | ||||||||||||||||||||||||
| use rocketmq_remoting::protocol::header::lock_batch_mq_request_header::LockBatchMqRequestHeader; | ||||||||||||||||||||||||
| use rocketmq_remoting::protocol::header::namesrv::register_broker_header::RegisterBrokerRequestHeader; | ||||||||||||||||||||||||
| use rocketmq_remoting::protocol::header::namesrv::register_broker_header::RegisterBrokerResponseHeader; | ||||||||||||||||||||||||
| use rocketmq_remoting::protocol::header::namesrv::topic_operation_header::RegisterTopicRequestHeader; | ||||||||||||||||||||||||
| use rocketmq_remoting::protocol::namesrv::RegisterBrokerResult; | ||||||||||||||||||||||||
| use rocketmq_remoting::protocol::remoting_command::RemotingCommand; | ||||||||||||||||||||||||
| use rocketmq_remoting::protocol::route::route_data_view::QueueData; | ||||||||||||||||||||||||
| use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData; | ||||||||||||||||||||||||
| use rocketmq_remoting::protocol::RemotingDeserializable; | ||||||||||||||||||||||||
| use rocketmq_remoting::protocol::RemotingSerializable; | ||||||||||||||||||||||||
| use rocketmq_remoting::remoting::RemotingService; | ||||||||||||||||||||||||
| use rocketmq_remoting::request_processor::default_request_processor::DefaultRemotingRequestProcessor; | ||||||||||||||||||||||||
|
|
@@ -48,6 +53,10 @@ | |||||||||||||||||||||||
| use tracing::error; | ||||||||||||||||||||||||
| use tracing::info; | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| use crate::error::BrokerError; | ||||||||||||||||||||||||
| use crate::error::BrokerError::BrokerClientError; | ||||||||||||||||||||||||
| use crate::Result; | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| pub struct BrokerOuterAPI { | ||||||||||||||||||||||||
| remoting_client: ArcRefCellWrapper<RocketmqDefaultClient<DefaultRemotingRequestProcessor>>, | ||||||||||||||||||||||||
| name_server_address: Option<String>, | ||||||||||||||||||||||||
|
|
@@ -317,6 +326,39 @@ | |||||||||||||||||||||||
| pub fn rpc_client(&self) -> &RpcClientImpl { | ||||||||||||||||||||||||
| &self.rpc_client | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| pub async fn lock_batch_mq_async( | ||||||||||||||||||||||||
| &self, | ||||||||||||||||||||||||
| addr: String, | ||||||||||||||||||||||||
| request_body: bytes::Bytes, | ||||||||||||||||||||||||
| timeout_millis: u64, | ||||||||||||||||||||||||
| ) -> Result<HashSet<MessageQueue>> { | ||||||||||||||||||||||||
| let mut request = RemotingCommand::create_request_command( | ||||||||||||||||||||||||
| RequestCode::LockBatchMq, | ||||||||||||||||||||||||
| LockBatchMqRequestHeader::default(), | ||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||
| request.set_body_mut_ref(Some(request_body)); | ||||||||||||||||||||||||
| let result = self | ||||||||||||||||||||||||
| .remoting_client | ||||||||||||||||||||||||
| .invoke_async(Some(addr), request, timeout_millis) | ||||||||||||||||||||||||
| .await; | ||||||||||||||||||||||||
| match result { | ||||||||||||||||||||||||
| Ok(response) => { | ||||||||||||||||||||||||
| if ResponseCode::from(response.code()) == ResponseCode::Success { | ||||||||||||||||||||||||
| let lock_batch_response_body = | ||||||||||||||||||||||||
| LockBatchResponseBody::decode(response.get_body().unwrap()).unwrap(); | ||||||||||||||||||||||||
| Ok(lock_batch_response_body.lock_ok_mq_set) | ||||||||||||||||||||||||
|
Comment on lines
+349
to
+350
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid using Using Apply this diff to handle the - let lock_batch_response_body =
- LockBatchResponseBody::decode(response.get_body().unwrap()).unwrap();
+ let body = response.get_body().ok_or_else(|| {
+ BrokerError::MQBrokerError(
+ response.code(),
+ "Response body is empty".to_string(),
+ "".to_string(),
+ )
+ })?;
+ let lock_batch_response_body = LockBatchResponseBody::decode(body)?;This modification ensures that if the response body is 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||
| Err(BrokerError::MQBrokerError( | ||||||||||||||||||||||||
| response.code(), | ||||||||||||||||||||||||
| response.remark().cloned().unwrap_or("".to_string()), | ||||||||||||||||||||||||
| "".to_string(), | ||||||||||||||||||||||||
| )) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
Comment on lines
+336
to
+358
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Improve logging and error messages for better debugging When the response code is not Consider including the address and any relevant request details in the error: Err(BrokerError::MQBrokerError(
response.code(),
response.remark().cloned().unwrap_or_else(|| "Unknown error".to_string()),
"".to_string(),
))
+ .context(format!(
+ "Failed to lock batch MQ at addr: {}, request: {:?}",
+ addr, request
+ ))This addition provides more context about where and why the failure occurred.
|
||||||||||||||||||||||||
| Err(e) => Err(BrokerClientError(e)), | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| fn dns_lookup_address_by_domain(domain: &str) -> Vec<String> { | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure
MessageQueueimplementsHashandEqcorrectly for use asHashMapkeysBy changing the
MessageQueueLockTableto useMessageQueuedirectly as keys instead ofArc<MessageQueue>, it's essential to ensure thatMessageQueueproperly implements theHashandEqtraits. This guarantees correct behavior when storing and retrieving entries from theHashMap, preventing potential issues with key collisions or incorrect lookups.