diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index 6adca4c11..43dcf767c 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -30,6 +30,7 @@ use rocketmq_common::common::statistics::state_getter::StateGetter; use rocketmq_common::ArcRefCellWrapper; use rocketmq_common::TimeUtils::get_current_millis; use rocketmq_common::UtilAll::compute_next_morning_time_millis; +use rocketmq_remoting::protocol::body::broker_body::broker_member_group::BrokerMemberGroup; use rocketmq_remoting::protocol::body::topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper; use rocketmq_remoting::protocol::body::topic_info_wrapper::topic_config_wrapper::TopicConfigSerializeWrapper; use rocketmq_remoting::protocol::namespace_util::NamespaceUtil; @@ -114,6 +115,7 @@ pub(crate) struct BrokerRuntime { #[cfg(feature = "local_file_store")] pull_request_hold_service: Option>, rebalance_lock_manager: Arc, + broker_member_group: Arc, } impl Clone for BrokerRuntime { @@ -147,6 +149,7 @@ impl Clone for BrokerRuntime { is_isolated: self.is_isolated.clone(), pull_request_hold_service: self.pull_request_hold_service.clone(), rebalance_lock_manager: self.rebalance_lock_manager.clone(), + broker_member_group: self.broker_member_group.clone(), } } } @@ -190,6 +193,14 @@ impl BrokerRuntime { })); let broker_stats_manager = Arc::new(stats_manager); consumer_manager.set_broker_stats_manager(Some(Arc::downgrade(&broker_stats_manager))); + let mut broker_member_group = BrokerMemberGroup::new( + broker_config.broker_identity.broker_cluster_name.clone(), + broker_config.broker_identity.broker_name.clone(), + ); + broker_member_group.broker_addrs.insert( + broker_config.broker_identity.broker_id, + broker_config.get_broker_addr(), + ); Self { broker_config: broker_config.clone(), message_store_config, @@ -222,6 +233,7 @@ impl BrokerRuntime { is_isolated: Arc::new(AtomicBool::new(false)), pull_request_hold_service: None, rebalance_lock_manager: Arc::new(Default::default()), + broker_member_group: Arc::new(broker_member_group), } } @@ -471,6 +483,8 @@ impl BrokerRuntime { self.consumer_manager.clone(), self.broker_out_api.clone(), self.broker_stats_manager.clone(), + self.rebalance_lock_manager.clone(), + self.broker_member_group.clone(), ); BrokerRequestProcessor { diff --git a/rocketmq-broker/src/client/net/broker_to_client.rs b/rocketmq-broker/src/client/net/broker_to_client.rs index e6dda7e29..26b27d8ec 100644 --- a/rocketmq-broker/src/client/net/broker_to_client.rs +++ b/rocketmq-broker/src/client/net/broker_to_client.rs @@ -18,7 +18,7 @@ use rocketmq_remoting::net::channel::Channel; use rocketmq_remoting::protocol::remoting_command::RemotingCommand; use crate::error::BrokerError::BrokerClientError; -use crate::BrokerResult; +use crate::Result; #[derive(Default, Clone)] pub struct Broker2Client; @@ -29,7 +29,7 @@ impl Broker2Client { channel: &mut Channel, request: RemotingCommand, timeout_millis: u64, - ) -> BrokerResult { + ) -> Result { match channel.send_wait_response(request, timeout_millis).await { Ok(value) => Ok(value), Err(e) => Err(BrokerClientError(e)), diff --git a/rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs b/rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs index a2b2de4e6..0ad5b68ad 100644 --- a/rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs +++ b/rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs @@ -15,6 +15,7 @@ * limitations under the License. */ use std::collections::HashMap; +use std::collections::HashSet; use std::sync::atomic::AtomicI64; use std::sync::Arc; @@ -34,7 +35,7 @@ lazy_static! { }; } -type MessageQueueLockTable = HashMap, LockEntry>>; +type MessageQueueLockTable = HashMap>; #[derive(Clone, Default)] pub struct RebalanceLockManager { @@ -60,26 +61,25 @@ impl RebalanceLockManager { pub fn try_lock_batch( &self, group: &str, - mqs: Vec>, + mqs: &HashSet, client_id: &str, - ) -> Vec> { - let mut lock_mqs = Vec::new(); - let mut not_locked_mqs = Vec::new(); + ) -> HashSet { + let mut lock_mqs = HashSet::with_capacity(mqs.len()); + let mut not_locked_mqs = HashSet::with_capacity(mqs.len()); for mq in mqs.iter() { if self.is_locked(group, mq, client_id) { - lock_mqs.push(mq.clone()); + lock_mqs.insert(mq.clone()); } else { - not_locked_mqs.push(mq.clone()); + not_locked_mqs.insert(mq.clone()); } } if !not_locked_mqs.is_empty() { let mut write_guard = self.mq_lock_table.write(); - let mut group_value = write_guard.get_mut(group); - if group_value.is_none() { - group_value = Some(write_guard.entry(group.to_string()).or_default()); - } - let group_value = group_value.unwrap(); - for mq in not_locked_mqs.iter() { + let group_value = write_guard + .entry(group.to_string()) + .or_insert(HashMap::with_capacity(32)); + + for mq in not_locked_mqs { let lock_entry = group_value.entry(mq.clone()).or_insert_with(|| { info!( "RebalanceLockManager#tryLockBatch: lock a message which has not been \ @@ -96,7 +96,7 @@ impl RebalanceLockManager { get_current_millis() as i64, std::sync::atomic::Ordering::Relaxed, ); - lock_mqs.push(mq.clone()); + lock_mqs.insert(mq); continue; } let old_client_id = lock_entry.client_id.as_str().to_string(); @@ -106,12 +106,12 @@ impl RebalanceLockManager { get_current_millis() as i64, std::sync::atomic::Ordering::Relaxed, ); - lock_mqs.push(mq.clone()); warn!( "RebalanceLockManager#tryLockBatch: try to lock a expired message queue, \ group={} mq={:?}, old client id={}, new client id={}", group, mq, old_client_id, client_id ); + lock_mqs.insert(mq); continue; } warn!( @@ -124,7 +124,7 @@ impl RebalanceLockManager { lock_mqs } - pub fn unlock_batch(&self, group: &str, mqs: Vec>, client_id: &str) { + pub fn unlock_batch(&self, group: &str, mqs: &HashSet, client_id: &str) { let mut write_guard = self.mq_lock_table.write(); let group_value = write_guard.get_mut(group); if group_value.is_none() { @@ -163,7 +163,7 @@ impl RebalanceLockManager { } } - fn is_locked(&self, group: &str, mq: &Arc, client_id: &str) -> bool { + fn is_locked(&self, group: &str, mq: &MessageQueue, client_id: &str) -> bool { let lock_table = self.mq_lock_table.read(); let group_value = lock_table.get(group); if group_value.is_none() { @@ -231,59 +231,71 @@ mod rebalance_lock_manager_tests { #[test] fn lock_all_expired_returns_false_when_active_locks_exist() { let manager = RebalanceLockManager::default(); - let mq = Arc::new(MessageQueue::default()); - manager.try_lock_batch("test_group", vec![mq.clone()], "client_1"); + let mq = MessageQueue::default(); + let mut set = HashSet::new(); + set.insert(mq.clone()); + manager.try_lock_batch("test_group", &set, "client_1"); assert!(!manager.is_lock_all_expired("test_group")); } #[test] fn try_lock_batch_locks_message_queues_for_new_group() { let manager = RebalanceLockManager::default(); - let mq = Arc::new(MessageQueue::default()); - let locked_mqs = manager.try_lock_batch("test_group", vec![mq.clone()], "client_1"); + let mq = MessageQueue::default(); + let mut set = HashSet::new(); + set.insert(mq.clone()); + let locked_mqs = manager.try_lock_batch("test_group", &set, "client_1"); assert_eq!(locked_mqs.len(), 1); } #[test] fn try_lock_batch_does_not_lock_already_locked_message_queues() { let manager = RebalanceLockManager::default(); - let mq = Arc::new(MessageQueue::default()); - manager.try_lock_batch("test_group", vec![mq.clone()], "client_1"); - let locked_mqs = manager.try_lock_batch("test_group", vec![mq.clone()], "client_2"); + let mq = MessageQueue::default(); + let mut set = HashSet::new(); + set.insert(mq.clone()); + manager.try_lock_batch("test_group", &set, "client_1"); + let locked_mqs = manager.try_lock_batch("test_group", &set, "client_2"); assert!(locked_mqs.is_empty()); } #[test] fn unlock_batch_unlocks_message_queues_locked_by_client() { let manager = RebalanceLockManager::default(); - let mq = Arc::new(MessageQueue::default()); - manager.try_lock_batch("test_group", vec![mq.clone()], "client_1"); - manager.unlock_batch("test_group", vec![mq.clone()], "client_1"); - let locked_mqs = manager.try_lock_batch("test_group", vec![mq.clone()], "client_2"); + let mq = MessageQueue::default(); + let mut set = HashSet::new(); + set.insert(mq.clone()); + manager.try_lock_batch("test_group", &set, "client_1"); + manager.unlock_batch("test_group", &set, "client_1"); + let locked_mqs = manager.try_lock_batch("test_group", &set, "client_2"); assert_eq!(locked_mqs.len(), 1); } #[test] fn unlock_batch_does_not_unlock_message_queues_locked_by_other_clients() { let manager = RebalanceLockManager::default(); - let mq = Arc::new(MessageQueue::default()); - manager.try_lock_batch("test_group", vec![mq.clone()], "client_1"); - manager.unlock_batch("test_group", vec![mq.clone()], "client_2"); + let mq = MessageQueue::default(); + let mut set = HashSet::new(); + set.insert(mq.clone()); + manager.try_lock_batch("test_group", &set, "client_1"); + manager.unlock_batch("test_group", &set, "client_2"); assert!(!manager.is_lock_all_expired("test_group")); } #[test] fn is_locked_returns_true_for_locked_message_queue() { let manager = RebalanceLockManager::default(); - let mq = Arc::new(MessageQueue::default()); - manager.try_lock_batch("test_group", vec![mq.clone()], "client_1"); + let mq = MessageQueue::default(); + let mut set = HashSet::new(); + set.insert(mq.clone()); + manager.try_lock_batch("test_group", &set, "client_1"); assert!(manager.is_locked("test_group", &mq, "client_1")); } #[test] fn is_locked_returns_false_for_unlocked_message_queue() { let manager = RebalanceLockManager::default(); - let mq = Arc::new(MessageQueue::default()); + let mq = MessageQueue::default(); assert!(!manager.is_locked("test_group", &mq, "client_1")); } } diff --git a/rocketmq-broker/src/error.rs b/rocketmq-broker/src/error.rs index 34228bb2b..b2ca3ce40 100644 --- a/rocketmq-broker/src/error.rs +++ b/rocketmq-broker/src/error.rs @@ -20,4 +20,7 @@ use thiserror::Error; pub enum BrokerError { #[error("broker client error: {0}")] BrokerClientError(#[from] rocketmq_remoting::error::Error), + + #[error("Client exception occurred: CODE:{0}, broker address:{2}, Message:{1}")] + MQBrokerError(i32, String, String), } diff --git a/rocketmq-broker/src/lib.rs b/rocketmq-broker/src/lib.rs index 8ad57b80c..0869fbf57 100644 --- a/rocketmq-broker/src/lib.rs +++ b/rocketmq-broker/src/lib.rs @@ -47,4 +47,4 @@ pub(crate) mod topic; pub(crate) mod util; type RemotingError = rocketmq_remoting::error::Error; -type BrokerResult = Result; +type Result = std::result::Result; diff --git a/rocketmq-broker/src/out_api/broker_outer_api.rs b/rocketmq-broker/src/out_api/broker_outer_api.rs index a6505d878..20d6acb48 100644 --- a/rocketmq-broker/src/out_api/broker_outer_api.rs +++ b/rocketmq-broker/src/out_api/broker_outer_api.rs @@ -14,12 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use std::collections::HashSet; use std::sync::Arc; use std::sync::Weak; use dns_lookup::lookup_host; use rocketmq_common::common::broker::broker_config::BrokerIdentity; use rocketmq_common::common::config::TopicConfig; +use rocketmq_common::common::message::message_queue::MessageQueue; use rocketmq_common::utils::crc32_utils; use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils; use rocketmq_common::ArcRefCellWrapper; @@ -29,7 +31,9 @@ use rocketmq_remoting::code::request_code::RequestCode; use rocketmq_remoting::code::response_code::ResponseCode; use rocketmq_remoting::protocol::body::broker_body::register_broker_body::RegisterBrokerBody; use rocketmq_remoting::protocol::body::kv_table::KVTable; +use rocketmq_remoting::protocol::body::response::lock_batch_response_body::LockBatchResponseBody; use rocketmq_remoting::protocol::body::topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper; +use rocketmq_remoting::protocol::header::lock_batch_mq_request_header::LockBatchMqRequestHeader; use rocketmq_remoting::protocol::header::namesrv::register_broker_header::RegisterBrokerRequestHeader; use rocketmq_remoting::protocol::header::namesrv::register_broker_header::RegisterBrokerResponseHeader; use rocketmq_remoting::protocol::header::namesrv::topic_operation_header::RegisterTopicRequestHeader; @@ -37,6 +41,7 @@ use rocketmq_remoting::protocol::namesrv::RegisterBrokerResult; use rocketmq_remoting::protocol::remoting_command::RemotingCommand; use rocketmq_remoting::protocol::route::route_data_view::QueueData; use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData; +use rocketmq_remoting::protocol::RemotingDeserializable; use rocketmq_remoting::protocol::RemotingSerializable; use rocketmq_remoting::remoting::RemotingService; use rocketmq_remoting::request_processor::default_request_processor::DefaultRemotingRequestProcessor; @@ -48,6 +53,10 @@ use tracing::debug; use tracing::error; use tracing::info; +use crate::error::BrokerError; +use crate::error::BrokerError::BrokerClientError; +use crate::Result; + pub struct BrokerOuterAPI { remoting_client: ArcRefCellWrapper>, name_server_address: Option, @@ -317,6 +326,39 @@ impl BrokerOuterAPI { pub fn rpc_client(&self) -> &RpcClientImpl { &self.rpc_client } + + pub async fn lock_batch_mq_async( + &self, + addr: String, + request_body: bytes::Bytes, + timeout_millis: u64, + ) -> Result> { + let mut request = RemotingCommand::create_request_command( + RequestCode::LockBatchMq, + LockBatchMqRequestHeader::default(), + ); + request.set_body_mut_ref(Some(request_body)); + let result = self + .remoting_client + .invoke_async(Some(addr), request, timeout_millis) + .await; + match result { + Ok(response) => { + if ResponseCode::from(response.code()) == ResponseCode::Success { + let lock_batch_response_body = + LockBatchResponseBody::decode(response.get_body().unwrap()).unwrap(); + Ok(lock_batch_response_body.lock_ok_mq_set) + } else { + Err(BrokerError::MQBrokerError( + response.code(), + response.remark().cloned().unwrap_or("".to_string()), + "".to_string(), + )) + } + } + Err(e) => Err(BrokerClientError(e)), + } + } } fn dns_lookup_address_by_domain(domain: &str) -> Vec { diff --git a/rocketmq-broker/src/processor/admin_broker_processor.rs b/rocketmq-broker/src/processor/admin_broker_processor.rs index f24d3c480..b558389cd 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor.rs @@ -21,6 +21,7 @@ use rocketmq_common::common::server::config::ServerConfig; use rocketmq_remoting::code::request_code::RequestCode; use rocketmq_remoting::code::response_code::ResponseCode; use rocketmq_remoting::net::channel::Channel; +use rocketmq_remoting::protocol::body::broker_body::broker_member_group::BrokerMemberGroup; use rocketmq_remoting::protocol::remoting_command::RemotingCommand; use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext; use rocketmq_store::config::message_store_config::MessageStoreConfig; @@ -30,8 +31,10 @@ use rocketmq_store::stats::broker_stats_manager::BrokerStatsManager; use tracing::warn; use crate::client::manager::consumer_manager::ConsumerManager; +use crate::client::rebalance::rebalance_lock_manager::RebalanceLockManager; use crate::offset::manager::consumer_offset_manager::ConsumerOffsetManager; use crate::out_api::broker_outer_api::BrokerOuterAPI; +use crate::processor::admin_broker_processor::batch_mq_handler::BatchMqHandler; use crate::processor::admin_broker_processor::broker_config_request_handler::BrokerConfigRequestHandler; use crate::processor::admin_broker_processor::consumer_request_handler::ConsumerRequestHandler; use crate::processor::admin_broker_processor::offset_request_handler::OffsetRequestHandler; @@ -41,6 +44,7 @@ use crate::schedule::schedule_message_service::ScheduleMessageService; use crate::topic::manager::topic_config_manager::TopicConfigManager; use crate::topic::manager::topic_queue_mapping_manager::TopicQueueMappingManager; +mod batch_mq_handler; mod broker_config_request_handler; mod consumer_request_handler; mod offset_request_handler; @@ -52,6 +56,7 @@ pub struct AdminBrokerProcessor { broker_config_request_handler: BrokerConfigRequestHandler, consumer_request_handler: ConsumerRequestHandler, offset_request_handler: OffsetRequestHandler, + batch_mq_handler: BatchMqHandler, } impl AdminBrokerProcessor { @@ -68,6 +73,8 @@ impl AdminBrokerProcessor { consume_manager: Arc, broker_out_api: Arc, broker_stats_manager: Arc, + rebalance_lock_manager: Arc, + broker_member_group: Arc, ) -> Self { let inner = Inner { broker_config, @@ -83,16 +90,20 @@ impl AdminBrokerProcessor { consume_manager, broker_out_api, broker_stats_manager, + rebalance_lock_manager, + broker_member_group, }; let topic_request_handler = TopicRequestHandler::new(inner.clone()); let broker_config_request_handler = BrokerConfigRequestHandler::new(inner.clone()); let consumer_request_handler = ConsumerRequestHandler::new(inner.clone()); let offset_request_handler = OffsetRequestHandler::new(inner.clone()); + let batch_mq_handler = BatchMqHandler::new(inner.clone()); AdminBrokerProcessor { topic_request_handler, broker_config_request_handler, consumer_request_handler, offset_request_handler, + batch_mq_handler, } } } @@ -192,6 +203,12 @@ impl AdminBrokerProcessor { .await } + RequestCode::LockBatchMq => { + self.batch_mq_handler + .lock_natch_mq(channel, ctx, request_code, request) + .await + } + _ => Some(get_unknown_cmd_response(request_code)), } } @@ -224,4 +241,6 @@ struct Inner { consume_manager: Arc, broker_out_api: Arc, broker_stats_manager: Arc, + rebalance_lock_manager: Arc, + broker_member_group: Arc, } diff --git a/rocketmq-broker/src/processor/admin_broker_processor/batch_mq_handler.rs b/rocketmq-broker/src/processor/admin_broker_processor/batch_mq_handler.rs new file mode 100644 index 000000000..0f5e7a46f --- /dev/null +++ b/rocketmq-broker/src/processor/admin_broker_processor/batch_mq_handler.rs @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::collections::HashMap; +use std::collections::HashSet; +use std::sync::Arc; +use std::time::Duration; + +use bytes::Bytes; +use rocketmq_remoting::code::request_code::RequestCode; +use rocketmq_remoting::net::channel::Channel; +use rocketmq_remoting::protocol::body::request::lock_batch_request_body::LockBatchRequestBody; +use rocketmq_remoting::protocol::body::response::lock_batch_response_body::LockBatchResponseBody; +use rocketmq_remoting::protocol::remoting_command::RemotingCommand; +use rocketmq_remoting::protocol::RemotingDeserializable; +use rocketmq_remoting::protocol::RemotingSerializable; +use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext; +use rocketmq_rust::CountDownLatch; +use tokio::sync::Mutex; +use tracing::warn; + +use crate::processor::admin_broker_processor::Inner; + +#[derive(Clone)] +pub(super) struct BatchMqHandler { + inner: Inner, +} + +impl BatchMqHandler { + pub(super) fn new(inner: Inner) -> Self { + Self { inner } + } + + pub async fn lock_natch_mq( + &mut self, + _channel: Channel, + _ctx: ConnectionHandlerContext, + _request_code: RequestCode, + request: RemotingCommand, + ) -> Option { + let mut request_body = LockBatchRequestBody::decode(request.get_body().unwrap()).unwrap(); + let mut lock_ok_mqset = HashSet::new(); + let self_lock_okmqset = self.inner.rebalance_lock_manager.try_lock_batch( + request_body.consumer_group.as_ref().unwrap(), + &request_body.mq_set, + request_body.client_id.as_ref().unwrap(), + ); + if request_body.only_this_broker || !self.inner.broker_config.lock_in_strict_mode { + lock_ok_mqset = self_lock_okmqset; + } else { + request_body.only_this_broker = true; + let replica_size = self.inner.message_store_config.total_replicas; + let quorum = replica_size / 2 + 1; + if quorum <= 1 { + lock_ok_mqset = self_lock_okmqset; + } else { + let mut mq_lock_map = HashMap::with_capacity(32); + for mq in self_lock_okmqset.iter() { + *mq_lock_map.entry(mq.clone()).or_insert(0) += 1; + } + let mut addr_map = HashMap::with_capacity(8); + addr_map.extend(self.inner.broker_member_group.broker_addrs.clone()); + addr_map.remove(&self.inner.broker_config.broker_identity.broker_id); + + let count_down_latch = CountDownLatch::new(addr_map.len() as u32); + let request_body = Bytes::from(request_body.encode()); + let mq_lock_map_arc = Arc::new(Mutex::new(mq_lock_map.clone())); + for (_, broker_addr) in addr_map { + let count_down_latch = count_down_latch.clone(); + let broker_outer_api = self.inner.broker_out_api.clone(); + let mq_lock_map = mq_lock_map_arc.clone(); + let request_body_cloned = request_body.clone(); + tokio::spawn(async move { + let result = broker_outer_api + .lock_batch_mq_async(broker_addr, request_body_cloned, 1000) + .await; + match result { + Ok(lock_ok_mqs) => { + let mut mq_lock_map = mq_lock_map.lock().await; + for mq in lock_ok_mqs { + *mq_lock_map.entry(mq).or_insert(0) += 1; + } + } + Err(e) => { + warn!("lockBatchMQAsync failed: {:?}", e); + } + } + count_down_latch.count_down().await; + }); + } + count_down_latch.wait_timeout(Duration::from_secs(2)).await; + + let mq_lock_map = mq_lock_map_arc.lock().await; + for (mq, count) in &*mq_lock_map { + if *count >= quorum { + lock_ok_mqset.insert(mq.clone()); + } + } + } + } + let response_body = LockBatchResponseBody { + lock_ok_mq_set: lock_ok_mqset, + }; + Some(RemotingCommand::create_response_command().set_body(Some(response_body.encode()))) + } + + pub async fn unlock_batch_mq( + &mut self, + _channel: Channel, + _ctx: ConnectionHandlerContext, + _request_code: RequestCode, + _request: RemotingCommand, + ) -> Option { + unimplemented!("unlockBatchMQ") + } +} diff --git a/rocketmq-client/src/error.rs b/rocketmq-client/src/error.rs index f6d541485..9a4d1140c 100644 --- a/rocketmq-client/src/error.rs +++ b/rocketmq-client/src/error.rs @@ -25,7 +25,7 @@ pub enum MQClientError { #[error("{0}")] RemotingTooMuchRequestError(String), - #[error("Client exception occurred: CODE:{0}, broker address:{1}, Message:{2}")] + #[error("Client exception occurred: CODE:{0}, broker address:{2}, Message:{1}")] MQBrokerError(i32, String, String), #[error("Client exception occurred: CODE:{0}, Message:{1}")] diff --git a/rocketmq-common/src/common/broker/broker_config.rs b/rocketmq-common/src/common/broker/broker_config.rs index 6556c2844..0c462cc25 100644 --- a/rocketmq-common/src/common/broker/broker_config.rs +++ b/rocketmq-common/src/common/broker/broker_config.rs @@ -169,6 +169,7 @@ pub struct BrokerConfig { pub auto_delete_unused_stats: bool, pub forward_timeout: u64, pub store_reply_message_enable: bool, + pub lock_in_strict_mode: bool, } impl Default for BrokerConfig { @@ -245,6 +246,7 @@ impl Default for BrokerConfig { enable_mixed_message_type: false, auto_delete_unused_stats: false, store_reply_message_enable: true, + lock_in_strict_mode: false, } } } diff --git a/rocketmq-namesrv/src/route/route_info_manager.rs b/rocketmq-namesrv/src/route/route_info_manager.rs index c2f7cb63b..051793622 100644 --- a/rocketmq-namesrv/src/route/route_info_manager.rs +++ b/rocketmq-namesrv/src/route/route_info_manager.rs @@ -648,15 +648,15 @@ impl RouteInfoManager { cluster_name: &str, broker_name: &str, ) -> Option { + let mut group_member = + BrokerMemberGroup::new(cluster_name.to_string(), broker_name.to_string()); if let Some(broker_data) = self.broker_addr_table.get(broker_name) { let map = broker_data.broker_addrs().clone(); - return Some(BrokerMemberGroup::new( - Some(cluster_name.to_string()), - Some(broker_name.to_string()), - Some(map), - )); + for (key, value) in map { + group_member.broker_addrs.insert(key as u64, value); + } } - None + Some(group_member) } pub(crate) fn wipe_write_perm_of_broker_by_lock(&mut self, broker_name: &str) -> i32 { diff --git a/rocketmq-remoting/src/protocol/body/broker_body/broker_member_group.rs b/rocketmq-remoting/src/protocol/body/broker_body/broker_member_group.rs index b8a120913..1c5f8c9df 100644 --- a/rocketmq-remoting/src/protocol/body/broker_body/broker_member_group.rs +++ b/rocketmq-remoting/src/protocol/body/broker_body/broker_member_group.rs @@ -22,21 +22,17 @@ use serde::Serialize; #[derive(Debug, Clone, Serialize, Deserialize, Default)] #[serde(rename_all = "camelCase")] pub struct BrokerMemberGroup { - pub cluster: Option, - pub broker_name: Option, - pub broker_addrs: Option>, + pub cluster: String, + pub broker_name: String, + pub broker_addrs: HashMap, } impl BrokerMemberGroup { - pub fn new( - cluster: Option, - broker_name: Option, - broker_addrs: Option>, - ) -> Self { + pub fn new(cluster: String, broker_name: String) -> Self { Self { cluster, broker_name, - broker_addrs, + broker_addrs: HashMap::new(), } } } diff --git a/rocketmq-remoting/src/protocol/body/request/lock_batch_request_body.rs b/rocketmq-remoting/src/protocol/body/request/lock_batch_request_body.rs index 712b2f57c..16de29586 100644 --- a/rocketmq-remoting/src/protocol/body/request/lock_batch_request_body.rs +++ b/rocketmq-remoting/src/protocol/body/request/lock_batch_request_body.rs @@ -21,7 +21,7 @@ use rocketmq_common::common::message::message_queue::MessageQueue; use serde::Deserialize; use serde::Serialize; -#[derive(Serialize, Deserialize, Debug, Default)] +#[derive(Serialize, Deserialize, Debug, Default, Clone)] #[serde(rename_all = "camelCase")] pub struct LockBatchRequestBody { pub consumer_group: Option,