diff --git a/Cargo.lock b/Cargo.lock index d62b8ec90..22b5f0639 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2395,6 +2395,7 @@ dependencies = [ "cron", "parking_lot", "rocketmq-error", + "serde", "tokio", "tokio-test", "tokio-util", @@ -2411,6 +2412,7 @@ dependencies = [ "bytes", "cheetah-string", "criterion", + "dashmap", "dirs", "futures-util", "lazy_static", diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index 980aed607..60cb4e974 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -1334,7 +1334,7 @@ impl BrokerRuntime { } impl BrokerRuntimeInner { - pub async fn register_single_topic_all(&self, topic_config: TopicConfig) { + pub async fn register_single_topic_all(&self, topic_config: ArcMut) { let mut topic_config = topic_config; if !PermName::is_writeable(self.broker_config.broker_permission) || !PermName::is_readable(self.broker_config.broker_permission) @@ -1352,7 +1352,7 @@ impl BrokerRuntimeInner { pub async fn register_increment_broker_data( this: ArcMut>, - topic_config_list: Vec, + topic_config_list: Vec>, data_version: DataVersion, ) { let mut serialize_wrapper = TopicConfigAndMappingSerializeWrapper { @@ -1371,10 +1371,10 @@ impl BrokerRuntimeInner { { TopicConfig { perm: topic_config.perm & this.broker_config().broker_permission, - ..topic_config.clone() + ..topic_config.as_ref().clone() } } else { - topic_config.clone() + topic_config.as_ref().clone() }; topic_config_table.insert( register_topic_config.topic_name.as_ref().unwrap().clone(), @@ -1495,7 +1495,6 @@ impl StateGetter for ProducerStateGetter { .broker_runtime_inner .topic_config_manager() .topic_config_table() - .lock() .contains_key(NamespaceUtil::wrap_namespace(instance_id, topic).as_str()) { self.broker_runtime_inner @@ -1528,7 +1527,6 @@ impl StateGetter for ConsumerStateGetter { .broker_runtime_inner .topic_config_manager() .topic_config_table() - .lock() .contains_key(topic) { let topic_full_name = @@ -2340,7 +2338,7 @@ impl BrokerRuntimeInner { ) { let mut topic_config_table = HashMap::new(); let table = self.topic_config_manager().topic_config_table(); - for topic_config in table.lock().values() { + for topic_config in table.iter() { let new_topic_config = if !PermName::is_writeable(self.broker_config.broker_permission) || !PermName::is_readable(self.broker_config.broker_permission) { @@ -2352,7 +2350,7 @@ impl BrokerRuntimeInner { ..TopicConfig::default() } } else { - topic_config.clone() + topic_config.as_ref().clone() }; topic_config_table.insert( new_topic_config.topic_name.as_ref().unwrap().clone(), diff --git a/rocketmq-broker/src/hook/batch_check_before_put_message.rs b/rocketmq-broker/src/hook/batch_check_before_put_message.rs index b222ce285..211d6a50e 100644 --- a/rocketmq-broker/src/hook/batch_check_before_put_message.rs +++ b/rocketmq-broker/src/hook/batch_check_before_put_message.rs @@ -14,13 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -use std::collections::HashMap; use std::sync::Arc; use cheetah_string::CheetahString; +use dashmap::DashMap; use rocketmq_common::common::config::TopicConfig; use rocketmq_common::common::message::message_ext_broker_inner::MessageExtBrokerInner; use rocketmq_common::common::message::MessageTrait; +use rocketmq_rust::ArcMut; use rocketmq_store::base::message_result::PutMessageResult; use rocketmq_store::hook::put_message_hook::PutMessageHook; use tracing::warn; @@ -28,13 +29,11 @@ use tracing::warn; use crate::util::hook_utils::HookUtils; pub struct BatchCheckBeforePutMessageHook { - topic_config_table: Arc>>, + topic_config_table: Arc>>, } impl BatchCheckBeforePutMessageHook { - pub fn new( - topic_config_table: Arc>>, - ) -> Self { + pub fn new(topic_config_table: Arc>>) -> Self { Self { topic_config_table } } } diff --git a/rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs b/rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs index 1cb327723..1f2dc006a 100644 --- a/rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs +++ b/rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs @@ -167,7 +167,7 @@ impl ConsumerOrderInfoManager { if *queue_id == topic_config.read_queue_nums as i32 { queues_to_remove.push(*queue_id); info!( - "Queue not exist, Clean order info, {}:{}, {}", + "Queue not exist, Clean order info, {}:{}, {:?}", topic_at_group, order_info, topic_config ); continue; diff --git a/rocketmq-broker/src/out_api/broker_outer_api.rs b/rocketmq-broker/src/out_api/broker_outer_api.rs index d8506bd3a..3f5a74706 100644 --- a/rocketmq-broker/src/out_api/broker_outer_api.rs +++ b/rocketmq-broker/src/out_api/broker_outer_api.rs @@ -134,7 +134,10 @@ impl BrokerOuterAPI { } } - fn create_request(broker_name: CheetahString, topic_config: TopicConfig) -> RemotingCommand { + fn create_request( + broker_name: CheetahString, + topic_config: ArcMut, + ) -> RemotingCommand { let request_header = RegisterTopicRequestHeader::new(topic_config.topic_name.as_ref().cloned().unwrap()); let queue_data = QueueData::new( @@ -328,7 +331,7 @@ impl BrokerOuterAPI { pub async fn register_single_topic_all( &self, broker_name: CheetahString, - topic_config: TopicConfig, + topic_config: ArcMut, timeout_mills: u64, ) { let request = Self::create_request(broker_name, topic_config); diff --git a/rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs b/rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs index 1754a38a2..74c342778 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs @@ -124,7 +124,7 @@ impl TopicRequestHandler { } }; - let mut topic_config = TopicConfig { + let topic_config = ArcMut::new(TopicConfig { topic_name: Some(topic.clone()), read_queue_nums: request_header.read_queue_nums as u32, write_queue_nums: request_header.write_queue_nums as u32, @@ -137,7 +137,7 @@ impl TopicRequestHandler { }, order: request_header.order, attributes, - }; + }); if topic_config.get_topic_message_type() == TopicMessageType::Mixed && !self .broker_runtime_inner @@ -154,10 +154,8 @@ impl TopicRequestHandler { let topic_config_origin = self .broker_runtime_inner .topic_config_manager() - .topic_config_table() - .lock() - .get(&topic) - .cloned(); + .get_topic_config(topic.as_str()) + .clone(); if topic_config_origin.is_some() && topic_config == topic_config_origin.unwrap() { info!( "Broker receive request to update or create topic={}, but topicConfig has no \ @@ -169,7 +167,7 @@ impl TopicRequestHandler { } self.broker_runtime_inner .topic_config_manager_mut() - .update_topic_config(&mut topic_config); + .update_topic_config(topic_config.clone()); if self .broker_runtime_inner @@ -179,7 +177,7 @@ impl TopicRequestHandler { self.broker_runtime_inner .topic_config_manager() .broker_runtime_inner() - .register_single_topic_all(topic_config) + .register_single_topic_all(topic_config.clone()) .await; } else { /* self.broker_runtime_inner @@ -257,10 +255,7 @@ impl TopicRequestHandler { let topic_config_origin = self .broker_runtime_inner .topic_config_manager() - .topic_config_table() - .lock() - .get(topic) - .cloned(); + .get_topic_config(topic); if topic_config_origin.is_some() && topic_config.clone() == topic_config_origin.unwrap() { info!( @@ -405,9 +400,7 @@ impl TopicRequestHandler { topic_config_table: self .broker_runtime_inner .topic_config_manager() - .topic_config_table() - .lock() - .clone(), + .topic_config_table_hash_map(), }, ..Default::default() }; @@ -553,8 +546,10 @@ impl TopicRequestHandler { } } } - let topic_config_and_queue_mapping = - TopicConfigAndQueueMapping::new(topic_config.unwrap(), topic_queue_mapping_detail); + let topic_config_and_queue_mapping = TopicConfigAndQueueMapping::new( + topic_config.unwrap().try_unwrap().unwrap(), + topic_queue_mapping_detail, + ); response.set_body_mut_ref( topic_config_and_queue_mapping .encode() diff --git a/rocketmq-broker/src/processor/notification_processor.rs b/rocketmq-broker/src/processor/notification_processor.rs index 445998cb1..835bfb0d5 100644 --- a/rocketmq-broker/src/processor/notification_processor.rs +++ b/rocketmq-broker/src/processor/notification_processor.rs @@ -109,7 +109,7 @@ impl NotificationProcessor { async fn has_msg_from_topic( &self, - topic_config: Option<&TopicConfig>, + topic_config: Option<&ArcMut>, random_q: i32, request_header: &NotificationRequestHeader, ) -> bool { diff --git a/rocketmq-broker/src/processor/pop_message_processor.rs b/rocketmq-broker/src/processor/pop_message_processor.rs index bcfffed14..0cac7ccfe 100644 --- a/rocketmq-broker/src/processor/pop_message_processor.rs +++ b/rocketmq-broker/src/processor/pop_message_processor.rs @@ -605,7 +605,7 @@ where .await } else { self.pop_msg_from_queue( - &topic_config.topic_name.unwrap_or_default(), + &topic_config.topic_name.clone().unwrap_or_default(), &request_header.attempt_id.clone().unwrap_or_default(), false, get_message_result.clone(), diff --git a/rocketmq-broker/src/processor/processor_service/pop_revive_service.rs b/rocketmq-broker/src/processor/processor_service/pop_revive_service.rs index 922797d9c..f44ad565b 100644 --- a/rocketmq-broker/src/processor/processor_service/pop_revive_service.rs +++ b/rocketmq-broker/src/processor/processor_service/pop_revive_service.rs @@ -173,7 +173,7 @@ impl PopReviveService { { return; } - let mut topic_config = TopicConfig::new(topic.clone()); + let mut topic_config = ArcMut::new(TopicConfig::new(topic.clone())); topic_config.read_queue_nums = PopAckConstants::RETRY_QUEUE_NUM as u32; topic_config.write_queue_nums = PopAckConstants::RETRY_QUEUE_NUM as u32; topic_config.topic_filter_type = TopicFilterType::SingleTag; @@ -181,7 +181,7 @@ impl PopReviveService { topic_config.topic_sys_flag = 0; self.broker_runtime_inner .topic_config_manager_mut() - .update_topic_config(&mut topic_config); + .update_topic_config(topic_config.clone()); self.init_pop_retry_offset(topic, consumer_group); } diff --git a/rocketmq-broker/src/processor/send_message_processor.rs b/rocketmq-broker/src/processor/send_message_processor.rs index 7331ab534..e592418b2 100644 --- a/rocketmq-broker/src/processor/send_message_processor.rs +++ b/rocketmq-broker/src/processor/send_message_processor.rs @@ -379,7 +379,7 @@ where .message_ext_inner .message, ); - if batch_uniq_id.is_some() && QueueTypeUtils::is_batch_cq(Some(topic_config).as_ref()) { + if batch_uniq_id.is_some() && QueueTypeUtils::is_batch_cq_arc_mut(Some(&topic_config)) { let sys_flag = batch_message .message_ext_broker_inner .message_ext_inner @@ -1027,7 +1027,7 @@ where response: &mut RemotingCommand, request: &RemotingCommand, msg: &mut MessageExt, - topic_config: &mut rocketmq_common::common::config::TopicConfig, + topic_config: &mut ArcMut, properties: &mut HashMap, ) -> bool { let mut new_topic = request_header.topic(); diff --git a/rocketmq-broker/src/slave/slave_synchronize.rs b/rocketmq-broker/src/slave/slave_synchronize.rs index 82309e8b6..cba4235e3 100644 --- a/rocketmq-broker/src/slave/slave_synchronize.rs +++ b/rocketmq-broker/src/slave/slave_synchronize.rs @@ -148,13 +148,14 @@ where .topic_config_manager() .topic_config_table(); - let mut topic_config_table = topic_config_table.lock(); - // Delete entries not in new config topic_config_table.retain(|key, _| new_topic_config_table.contains_key(key)); // Update with new entries - topic_config_table.extend(new_topic_config_table); + for (key, value) in new_topic_config_table.into_iter() { + topic_config_table.insert(key, ArcMut::new(value)); + } + drop(topic_config_table); self.broker_runtime_inner.topic_config_manager().persist(); } @@ -180,12 +181,13 @@ where .broker_runtime_inner .topic_config_manager() .topic_config_table(); - let mut topic_config_table = topic_config_table.lock(); // Delete entries not in new config topic_config_table.retain(|key, _| new_topic_config_table.contains_key(key)); // Update with new entries - topic_config_table.extend(new_topic_config_table); + for (key, value) in new_topic_config_table.into_iter() { + topic_config_table.insert(key, ArcMut::new(value)); + } drop(topic_config_table); self.broker_runtime_inner .topic_queue_mapping_manager() diff --git a/rocketmq-broker/src/topic/manager/topic_config_manager.rs b/rocketmq-broker/src/topic/manager/topic_config_manager.rs index c73c1798d..02dd20ba4 100644 --- a/rocketmq-broker/src/topic/manager/topic_config_manager.rs +++ b/rocketmq-broker/src/topic/manager/topic_config_manager.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use std::time::Duration; use cheetah_string::CheetahString; +use dashmap::DashMap; use rocketmq_common::common::attribute::attribute_util::AttributeUtil; use rocketmq_common::common::config::TopicConfig; use rocketmq_common::common::config_manager::ConfigManager; @@ -47,7 +48,7 @@ use crate::broker_path_config_helper::get_topic_config_path; use crate::broker_runtime::BrokerRuntimeInner; pub(crate) struct TopicConfigManager { - topic_config_table: Arc>>, + topic_config_table: Arc>>, data_version: ArcMut, topic_config_table_lock: Arc>, broker_runtime_inner: ArcMut>, @@ -58,7 +59,7 @@ impl TopicConfigManager { pub fn new(broker_runtime_inner: ArcMut>, init: bool) -> Self { let mut manager = Self { - topic_config_table: Arc::new(parking_lot::Mutex::new(HashMap::new())), + topic_config_table: Arc::new(DashMap::with_capacity(1024)), data_version: ArcMut::new(DataVersion::default()), topic_config_table_lock: Default::default(), broker_runtime_inner, @@ -72,11 +73,11 @@ impl TopicConfigManager { fn init(&mut self) { //SELF_TEST_TOPIC { - self.put_topic_config(TopicConfig::with_queues( + self.put_topic_config(ArcMut::new(TopicConfig::with_queues( TopicValidator::RMQ_SYS_SELF_TEST_TOPIC, 1, 1, - )); + ))); } //auto create topic setting @@ -91,20 +92,20 @@ impl TopicConfigManager { .broker_config() .topic_queue_config .default_topic_queue_nums; - self.put_topic_config(TopicConfig::with_perm( + self.put_topic_config(ArcMut::new(TopicConfig::with_perm( TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC, default_topic_queue_nums, default_topic_queue_nums, PermName::PERM_INHERIT | PermName::PERM_READ | PermName::PERM_WRITE, - )); + ))); } } { - self.put_topic_config(TopicConfig::with_queues( + self.put_topic_config(ArcMut::new(TopicConfig::with_queues( TopicValidator::RMQ_SYS_BENCHMARK_TOPIC, 1024, 1024, - )); + ))); } { let topic = self @@ -124,7 +125,7 @@ impl TopicConfigManager { perm |= PermName::PERM_READ | PermName::PERM_WRITE; } config.perm = perm; - self.put_topic_config(config); + self.put_topic_config(ArcMut::new(config)); } { @@ -147,23 +148,23 @@ impl TopicConfigManager { perm |= PermName::PERM_READ | PermName::PERM_WRITE; } config.perm = perm; - self.put_topic_config(config); + self.put_topic_config(ArcMut::new(config)); } { - self.put_topic_config(TopicConfig::with_queues( + self.put_topic_config(ArcMut::new(TopicConfig::with_queues( TopicValidator::RMQ_SYS_OFFSET_MOVED_EVENT, 1, 1, - )); + ))); } { - self.put_topic_config(TopicConfig::with_queues( + self.put_topic_config(ArcMut::new(TopicConfig::with_queues( TopicValidator::RMQ_SYS_SCHEDULE_TOPIC, Self::SCHEDULE_TOPIC_QUEUE_NUM, Self::SCHEDULE_TOPIC_QUEUE_NUM, - )); + ))); } { @@ -174,7 +175,7 @@ impl TopicConfigManager { .msg_trace_topic_name .clone(); TopicValidator::add_system_topic(topic.as_str()); - self.put_topic_config(TopicConfig::with_queues(topic, 1, 1)); + self.put_topic_config(ArcMut::new(TopicConfig::with_queues(topic, 1, 1))); } } @@ -188,7 +189,7 @@ impl TopicConfigManager { mix_all::REPLY_TOPIC_POSTFIX ); TopicValidator::add_system_topic(topic.as_str()); - self.put_topic_config(TopicConfig::with_queues(topic, 1, 1)); + self.put_topic_config(ArcMut::new(TopicConfig::with_queues(topic, 1, 1))); } { @@ -200,11 +201,11 @@ impl TopicConfigManager { .as_str(), ); TopicValidator::add_system_topic(topic.as_str()); - self.put_topic_config(TopicConfig::with_queues( + self.put_topic_config(ArcMut::new(TopicConfig::with_queues( topic, self.broker_runtime_inner.broker_config().revive_queue_num, self.broker_runtime_inner.broker_config().revive_queue_num, - )); + ))); } { @@ -217,23 +218,28 @@ impl TopicConfigManager { .broker_name, ); TopicValidator::add_system_topic(topic.as_str()); - self.put_topic_config(TopicConfig::with_perm(topic, 1, 1, PermName::PERM_INHERIT)); + self.put_topic_config(ArcMut::new(TopicConfig::with_perm( + topic, + 1, + 1, + PermName::PERM_INHERIT, + ))); } { - self.put_topic_config(TopicConfig::with_queues( + self.put_topic_config(ArcMut::new(TopicConfig::with_queues( TopicValidator::RMQ_SYS_TRANS_HALF_TOPIC, 1, 1, - )); + ))); } { - self.put_topic_config(TopicConfig::with_queues( + self.put_topic_config(ArcMut::new(TopicConfig::with_queues( TopicValidator::RMQ_SYS_TRANS_OP_HALF_TOPIC, 1, 1, - )); + ))); } { @@ -243,18 +249,18 @@ impl TopicConfigManager { .timer_wheel_enable { TopicValidator::add_system_topic(timer_message_store::TIMER_TOPIC); - self.put_topic_config(TopicConfig::with_queues( + self.put_topic_config(ArcMut::new(TopicConfig::with_queues( timer_message_store::TIMER_TOPIC, 1, 1, - )); + ))); } } } #[inline] - pub fn select_topic_config(&self, topic: &CheetahString) -> Option { - self.topic_config_table.lock().get(topic).cloned() + pub fn select_topic_config(&self, topic: &CheetahString) -> Option> { + self.topic_config_table.get(topic).as_deref().cloned() } pub fn build_serialize_wrapper( @@ -277,13 +283,13 @@ impl TopicConfigManager { self.data_version.mut_from_ref().next_version(); } TopicConfigAndMappingSerializeWrapper { - topic_config_serialize_wrapper: rocketmq_remoting::protocol::body::topic_info_wrapper::topic_config_wrapper::TopicConfigSerializeWrapper { - topic_config_table, - data_version: self.data_version.as_ref().clone(), - }, - topic_queue_mapping_info_map, - ..TopicConfigAndMappingSerializeWrapper::default() - } + topic_config_serialize_wrapper: rocketmq_remoting::protocol::body::topic_info_wrapper::topic_config_wrapper::TopicConfigSerializeWrapper { + topic_config_table, + data_version: self.data_version.as_ref().clone(), + }, + topic_queue_mapping_info_map, + ..TopicConfigAndMappingSerializeWrapper::default() + } } #[inline] @@ -294,12 +300,15 @@ impl TopicConfigManager { } } - fn get_topic_config(&self, topic_name: &str) -> Option { - self.topic_config_table.lock().get(topic_name).cloned() + pub fn get_topic_config(&self, topic_name: &str) -> Option> { + self.topic_config_table.get(topic_name).as_deref().cloned() } - pub(crate) fn put_topic_config(&self, topic_config: TopicConfig) -> Option { - self.topic_config_table.lock().insert( + pub(crate) fn put_topic_config( + &self, + topic_config: ArcMut, + ) -> Option> { + self.topic_config_table.insert( topic_config.topic_name.as_ref().unwrap().clone(), topic_config, ) @@ -312,13 +321,14 @@ impl TopicConfigManager { remote_address: SocketAddr, client_default_topic_queue_nums: i32, topic_sys_flag: u32, - ) -> Option { + ) -> Option> { let (topic_config, create_new) = if let Some(_lock) = self .topic_config_table_lock .try_lock_for(Duration::from_secs(3)) { - if let Some(topic_config) = self.get_topic_config(topic) { - return Some(topic_config); + let topic_config = self.get_topic_config(topic); + if topic_config.is_some() { + return topic_config; } if let Some(mut default_topic_config) = self.get_topic_config(default_topic) { @@ -345,6 +355,7 @@ impl TopicConfigManager { "Create new topic by default topic:[{}] config:[{:?}] producer:[{}]", default_topic, topic_config, remote_address ); + let topic_config = ArcMut::new(topic_config); self.put_topic_config(topic_config.clone()); self.data_version.mut_from_ref().next_version_with( self.broker_runtime_inner @@ -371,7 +382,7 @@ impl TopicConfigManager { }; if create_new { - self.register_broker_data(topic_config.as_ref().unwrap()); + self.register_broker_data(topic_config.clone().unwrap()); } topic_config } @@ -383,13 +394,13 @@ impl TopicConfigManager { perm: u32, is_order: bool, topic_sys_flag: u32, - ) -> Option { - if let Some(ref mut config) = self.get_topic_config(topic) { + ) -> Option> { + if let Some(mut config) = self.get_topic_config(topic) { if is_order != config.order { config.order = is_order; - self.update_topic_config(config); + self.update_topic_config(config.clone()); } - return Some(config.clone()); + return Some(config); } let (topic_config, create_new) = if let Some(_lock) = self @@ -400,14 +411,14 @@ impl TopicConfigManager { return Some(config); } - let mut config = TopicConfig::new(topic); + let mut config = ArcMut::new(TopicConfig::new(topic)); config.read_queue_nums = client_default_topic_queue_nums as u32; config.write_queue_nums = client_default_topic_queue_nums as u32; config.perm = perm; config.topic_sys_flag = topic_sys_flag; config.order = is_order; - self.put_topic_config(config.clone()); + let current_ref = self.put_topic_config(config); self.data_version.mut_from_ref().next_version_with( self.broker_runtime_inner .message_store() @@ -416,18 +427,18 @@ impl TopicConfigManager { .get_state_machine_version(), ); self.persist(); - (Some(config), true) + (current_ref, true) } else { (None, false) }; if create_new { - self.register_broker_data(topic_config.as_ref().unwrap()); + self.register_broker_data(topic_config.clone().unwrap()); } topic_config } - fn register_broker_data(&mut self, topic_config: &TopicConfig) { + fn register_broker_data(&mut self, topic_config: ArcMut) { let broker_config = self.broker_runtime_inner.broker_config().clone(); let broker_runtime_inner = self.broker_runtime_inner.clone(); let topic_config_clone = topic_config.clone(); @@ -448,15 +459,19 @@ impl TopicConfigManager { }); } - pub fn update_topic_config_list(&mut self, topic_config_list: &mut [TopicConfig]) { + pub fn update_topic_config_list(&mut self, topic_config_list: &mut [ArcMut]) { for topic_config in topic_config_list { - self.update_topic_config(topic_config); + //can optimize todo + self.update_topic_config(topic_config.clone()); } } #[inline] - pub fn remove_topic_config(&self, topic: &str) -> Option { - self.topic_config_table.lock().remove(topic) + pub fn remove_topic_config(&self, topic: &str) -> Option> { + match self.topic_config_table.remove(topic) { + None => None, + Some(value) => Some(value.1), + } } pub fn delete_topic_config(&self, topic: &CheetahString) { @@ -478,12 +493,11 @@ impl TopicConfigManager { } } - pub fn update_topic_config(&mut self, topic_config: &mut TopicConfig) { - let new_attributes = Self::request(topic_config); + pub fn update_topic_config(&mut self, mut topic_config: ArcMut) { + let new_attributes = Self::request(topic_config.as_ref()); let current_attributes = self.current(topic_config.topic_name.as_ref().unwrap().as_str()); let create = self .topic_config_table - .lock() .get(topic_config.topic_name.as_ref().unwrap().as_str()) .is_none(); @@ -540,15 +554,20 @@ impl TopicConfigManager { } } - pub fn topic_config_table( - &self, - ) -> Arc>> { + pub fn topic_config_table(&self) -> Arc>> { self.topic_config_table.clone() } + pub fn topic_config_table_hash_map(&self) -> HashMap { + self.topic_config_table + .iter() + .map(|entry| (entry.key().clone(), entry.value().as_ref().clone())) + .collect::>() + } + pub fn set_topic_config_table( &mut self, - topic_config_table: Arc>>, + topic_config_table: Arc>>, ) { self.topic_config_table = topic_config_table; } @@ -557,7 +576,7 @@ impl TopicConfigManager { &mut self, client_default_topic_queue_nums: i32, perm: u32, - ) -> Option { + ) -> Option> { if let Some(ref mut config) = self.get_topic_config(TopicValidator::RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC) { @@ -571,10 +590,12 @@ impl TopicConfigManager { if let Some(config) = self.get_topic_config(TopicValidator::RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC) { - return Some(config); + return Some(config.clone()); } - let mut config = TopicConfig::new(TopicValidator::RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC); + let mut config = ArcMut::new(TopicConfig::new( + TopicValidator::RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC, + )); config.read_queue_nums = client_default_topic_queue_nums as u32; config.write_queue_nums = client_default_topic_queue_nums as u32; config.perm = perm; @@ -595,13 +616,13 @@ impl TopicConfigManager { }; if create_new { - self.register_broker_data(topic_config.as_ref().unwrap()); + self.register_broker_data(topic_config.clone().unwrap()); } topic_config } pub fn contains_topic(&self, topic: &CheetahString) -> bool { - self.topic_config_table.lock().contains_key(topic) + self.topic_config_table.contains_key(topic) } pub fn data_version(&self) -> ArcMut { @@ -637,7 +658,12 @@ impl ConfigManager for TopicConfigManager { } fn encode_pretty(&self, pretty_format: bool) -> String { - let topic_config_table = self.topic_config_table.lock().clone(); + let topic_config_table = self + .topic_config_table + .iter() + .map(|entry| (entry.key().clone(), entry.value().as_ref().clone())) + .collect::>(); + let version = self.data_version().as_ref().clone(); match pretty_format { true => TopicConfigSerializeWrapper::new(Some(topic_config_table), Some(version)) @@ -654,16 +680,14 @@ impl ConfigManager for TopicConfigManager { if json_string.is_empty() { return; } - let wrapper = SerdeJsonUtils::from_json_str::(json_string) + let mut wrapper = SerdeJsonUtils::from_json_str::(json_string) .expect("Decode TopicConfigSerializeWrapper from json failed"); if let Some(value) = wrapper.data_version() { self.data_version.mut_from_ref().assign_new_one(value); } - if let Some(map) = wrapper.topic_config_table() { + if let Some(map) = wrapper.take_topic_config_table() { for (key, value) in map { - self.topic_config_table - .lock() - .insert(key.clone(), value.clone()); + self.topic_config_table.insert(key, ArcMut::new(value)); } } } diff --git a/rocketmq-broker/src/transaction/queue/transactional_message_bridge.rs b/rocketmq-broker/src/transaction/queue/transactional_message_bridge.rs index 5c8d957f9..f41eee01b 100644 --- a/rocketmq-broker/src/transaction/queue/transactional_message_bridge.rs +++ b/rocketmq-broker/src/transaction/queue/transactional_message_bridge.rs @@ -227,7 +227,7 @@ where found_list } - pub fn select_topic_config(&mut self, topic: &CheetahString) -> Option { + pub fn select_topic_config(&mut self, topic: &CheetahString) -> Option> { let mut topic_config = self .broker_runtime_inner .topic_config_manager() diff --git a/rocketmq-broker/src/util/hook_utils.rs b/rocketmq-broker/src/util/hook_utils.rs index 28766a150..83d3050a0 100644 --- a/rocketmq-broker/src/util/hook_utils.rs +++ b/rocketmq-broker/src/util/hook_utils.rs @@ -15,12 +15,12 @@ * limitations under the License. */ -use std::collections::HashMap; use std::sync::atomic::AtomicI64; use std::sync::atomic::Ordering; use std::sync::Arc; use cheetah_string::CheetahString; +use dashmap::DashMap; use rocketmq_common::common::broker::broker_role::BrokerRole; use rocketmq_common::common::config::TopicConfig; use rocketmq_common::common::message::message_ext::MessageExt; @@ -138,7 +138,7 @@ impl HookUtils { } pub fn check_inner_batch( - topic_config_table: &Arc>>, + topic_config_table: &Arc>>, msg: &MessageExt, ) -> Option { if msg @@ -156,9 +156,9 @@ impl HookUtils { } if MessageSysFlag::check(msg.sys_flag(), MessageSysFlag::INNER_BATCH_FLAG) { - let topic_config_table_guard = topic_config_table.lock(); - let topic_config = topic_config_table_guard.get(msg.topic()); - if !QueueTypeUtils::is_batch_cq(topic_config) { + let topic_config_ref = topic_config_table.get(msg.topic()); + let topic_config = topic_config_ref.as_deref(); + if !QueueTypeUtils::is_batch_cq_arc_mut(topic_config) { error!("[BUG]The message is an inner batch but cq type is not batch cq"); return Some(PutMessageResult::new_default( PutMessageStatus::MessageIllegal, @@ -416,7 +416,6 @@ impl HookUtils { #[cfg(test)] mod tests { - use std::collections::HashMap; use std::sync::Arc; use rocketmq_common::common::config::TopicConfig; @@ -428,9 +427,12 @@ mod tests { #[test] fn check_inner_batch_returns_message_illegal_when_inner_batch_flag_is_set_but_cq_type_is_not_batch_cq( ) { - let mut topic_config_table = HashMap::new(); - topic_config_table.insert("test_topic".into(), TopicConfig::default()); - let topic_config_table = Arc::new(parking_lot::Mutex::new(topic_config_table)); + let topic_config_table = DashMap::new(); + topic_config_table.insert( + CheetahString::from_static_str("test_topic"), + ArcMut::new(TopicConfig::default()), + ); + let topic_config_table = Arc::new(topic_config_table); let mut msg = MessageExt::default(); msg.message.topic = "test_topic".into(); msg.set_sys_flag(MessageSysFlag::INNER_BATCH_FLAG); diff --git a/rocketmq-common/src/utils/cleanup_policy_utils.rs b/rocketmq-common/src/utils/cleanup_policy_utils.rs index 841e98870..f9b403877 100644 --- a/rocketmq-common/src/utils/cleanup_policy_utils.rs +++ b/rocketmq-common/src/utils/cleanup_policy_utils.rs @@ -17,6 +17,8 @@ use std::str::FromStr; +use rocketmq_rust::ArcMut; + use crate::common::attribute::cleanup_policy::CleanupPolicy; use crate::common::attribute::Attribute; use crate::common::config::TopicConfig; @@ -48,6 +50,25 @@ pub fn get_delete_policy(topic_config: Option<&TopicConfig>) -> CleanupPolicy { } } +pub fn get_delete_policy_arc_mut(topic_config: Option<&ArcMut>) -> CleanupPolicy { + match topic_config { + Some(config) => { + let attribute_name = TopicAttributes::cleanup_policy_attribute().name(); + match config.attributes.get(attribute_name) { + Some(value) => CleanupPolicy::from_str(value.as_str()).unwrap(), + None => CleanupPolicy::from_str( + TopicAttributes::cleanup_policy_attribute().default_value(), + ) + .unwrap(), + } + } + None => { + CleanupPolicy::from_str(TopicAttributes::cleanup_policy_attribute().default_value()) + .unwrap() + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/rocketmq-common/src/utils/queue_type_utils.rs b/rocketmq-common/src/utils/queue_type_utils.rs index 12652fdfd..900a81a39 100644 --- a/rocketmq-common/src/utils/queue_type_utils.rs +++ b/rocketmq-common/src/utils/queue_type_utils.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; +use rocketmq_rust::ArcMut; + use crate::common::attribute::cq_type::CQType; use crate::common::attribute::Attribute; use crate::common::config::TopicConfig; @@ -31,6 +33,30 @@ impl QueueTypeUtils { .unwrap_or(CQType::SimpleCQ), } } + + pub fn is_batch_cq_arc_mut(topic_config: Option<&ArcMut>) -> bool { + Self::get_cq_type_arc_mut(topic_config) == CQType::BatchCQ + } + + pub fn get_cq_type_arc_mut(topic_config: Option<&ArcMut>) -> CQType { + match topic_config { + Some(config) => { + let default_value = TopicAttributes::queue_type_attribute().default_value(); + + let attribute_name = TopicAttributes::queue_type_attribute().name(); + match config.attributes.get(attribute_name) { + Some(value) => value + .parse() + .unwrap_or(default_value.parse().unwrap_or(CQType::SimpleCQ)), + None => default_value.parse().unwrap_or(CQType::SimpleCQ), + } + } + None => TopicAttributes::queue_type_attribute() + .default_value() + .parse() + .unwrap_or(CQType::SimpleCQ), + } + } } #[cfg(test)] diff --git a/rocketmq-remoting/src/protocol/body/create_topic_list_request_body.rs b/rocketmq-remoting/src/protocol/body/create_topic_list_request_body.rs index 3c1d048a8..7278ab3e1 100644 --- a/rocketmq-remoting/src/protocol/body/create_topic_list_request_body.rs +++ b/rocketmq-remoting/src/protocol/body/create_topic_list_request_body.rs @@ -15,11 +15,12 @@ * limitations under the License. */ use rocketmq_common::common::config::TopicConfig; +use rocketmq_rust::ArcMut; use serde::Deserialize; use serde::Serialize; #[derive(Debug, Clone, Serialize, Deserialize, Default)] #[serde(rename_all = "camelCase")] pub struct CreateTopicListRequestBody { - pub topic_config_list: Vec, + pub topic_config_list: Vec>, } diff --git a/rocketmq-remoting/src/protocol/body/topic_info_wrapper.rs b/rocketmq-remoting/src/protocol/body/topic_info_wrapper.rs index 10b9a7b3b..abd75a2d6 100644 --- a/rocketmq-remoting/src/protocol/body/topic_info_wrapper.rs +++ b/rocketmq-remoting/src/protocol/body/topic_info_wrapper.rs @@ -73,4 +73,8 @@ impl TopicConfigSerializeWrapper { pub fn set_data_version(&mut self, data_version: Option) { self.data_version = data_version; } + + pub fn take_topic_config_table(&mut self) -> Option> { + self.topic_config_table.take() + } } diff --git a/rocketmq-store/Cargo.toml b/rocketmq-store/Cargo.toml index 2f358584e..1c44da704 100644 --- a/rocketmq-store/Cargo.toml +++ b/rocketmq-store/Cargo.toml @@ -59,6 +59,8 @@ uuid = { workspace = true } anyhow = { workspace = true } page_size = "0.6.0" +dashmap = { workspace = true } + [target.'cfg(unix)'.dependencies] libc = "0.2.177" diff --git a/rocketmq-store/src/base/append_message_callback.rs b/rocketmq-store/src/base/append_message_callback.rs index dc76b5850..439febad3 100644 --- a/rocketmq-store/src/base/append_message_callback.rs +++ b/rocketmq-store/src/base/append_message_callback.rs @@ -14,13 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; use bytes::Buf; use bytes::BufMut; use cheetah_string::CheetahString; +use dashmap::DashMap; use rocketmq_common::common::config::TopicConfig; use rocketmq_common::common::message::message_batch::MessageExtBatch; use rocketmq_common::common::message::message_ext_broker_inner::MessageExtBrokerInner; @@ -29,6 +29,7 @@ use rocketmq_common::utils::message_utils; use rocketmq_common::CRC32Utils::crc32; use rocketmq_common::MessageDecoder::create_crc32; use rocketmq_common::MessageUtils::build_batch_message_id; +use rocketmq_rust::ArcMut; use rocketmq_rust::SyncUnsafeCellWrapper; use crate::base::message_result::AppendMessageResult; @@ -83,13 +84,13 @@ pub(crate) struct DefaultAppendMessageCallback { msg_store_item_memory: SyncUnsafeCellWrapper, crc32_reserved_length: i32, message_store_config: Arc, - topic_config_table: Arc>>, + topic_config_table: Arc>>, } impl DefaultAppendMessageCallback { pub fn new( message_store_config: Arc, - topic_config_table: Arc>>, + topic_config_table: Arc>>, ) -> Self { let crc32_reserved_length = if message_store_config.enabled_append_prop_crc { CRC32_RESERVED_LEN diff --git a/rocketmq-store/src/log_file/commit_log.rs b/rocketmq-store/src/log_file/commit_log.rs index 3fcdddd7b..90b0ae5bd 100644 --- a/rocketmq-store/src/log_file/commit_log.rs +++ b/rocketmq-store/src/log_file/commit_log.rs @@ -26,6 +26,7 @@ use bytes::Buf; use bytes::Bytes; use bytes::BytesMut; use cheetah_string::CheetahString; +use dashmap::DashMap; use rocketmq_common::common::attribute::cq_type::CQType; use rocketmq_common::common::broker::broker_config::BrokerConfig; use rocketmq_common::common::broker::broker_role::BrokerRole; @@ -152,16 +153,15 @@ fn generate_key(msg: &MessageExtBrokerInner) -> String { } pub fn get_cq_type( - topic_config_table: &Arc>>, + topic_config_table: &Arc>>, msg_inner: &MessageExtBrokerInner, ) -> CQType { - let topic_config_table_guard = topic_config_table.lock(); - let option = topic_config_table_guard.get(msg_inner.topic()); - QueueTypeUtils::get_cq_type(option) + let binding = topic_config_table.get(msg_inner.topic()); + QueueTypeUtils::get_cq_type_arc_mut(binding.as_deref()) } pub fn get_message_num( - topic_config_table: &Arc>>, + topic_config_table: &Arc>>, msg_inner: &MessageExtBrokerInner, ) -> i16 { let mut message_num = 1i16; @@ -196,7 +196,7 @@ pub struct CommitLog { append_message_callback: Arc, put_message_lock: Arc>, topic_queue_lock: Arc, - topic_config_table: Arc>>, + topic_config_table: Arc>>, consume_queue_store: ConsumeQueueStore, flush_manager: Arc>, begin_time_in_lock: Arc, @@ -209,7 +209,7 @@ impl CommitLog { broker_config: Arc, dispatcher: ArcMut, store_checkpoint: Arc, - topic_config_table: Arc>>, + topic_config_table: Arc>>, consume_queue_store: ConsumeQueueStore, ) -> Self { let enabled_append_prop_crc = message_store_config.enabled_append_prop_crc; diff --git a/rocketmq-store/src/message_store/local_file_message_store.rs b/rocketmq-store/src/message_store/local_file_message_store.rs index 3112604dc..eb9bbb4fd 100644 --- a/rocketmq-store/src/message_store/local_file_message_store.rs +++ b/rocketmq-store/src/message_store/local_file_message_store.rs @@ -42,6 +42,7 @@ use bytes::Buf; use bytes::Bytes; use bytes::BytesMut; use cheetah_string::CheetahString; +use dashmap::DashMap; use rocketmq_common::common::attribute::cleanup_policy::CleanupPolicy; use rocketmq_common::common::boundary_type::BoundaryType; use rocketmq_common::common::broker::broker_config::BrokerConfig; @@ -62,6 +63,7 @@ use rocketmq_common::common::system_clock::SystemClock; use rocketmq_common::utils::queue_type_utils::QueueTypeUtils; use rocketmq_common::utils::util_all; use rocketmq_common::CleanupPolicyUtils::get_delete_policy; +use rocketmq_common::CleanupPolicyUtils::get_delete_policy_arc_mut; use rocketmq_common::FileUtils::string_to_file; use rocketmq_common::MessageDecoder; use rocketmq_common::TimeUtils::get_current_millis; @@ -125,7 +127,7 @@ pub struct LocalFileMessageStore { message_store_config: Arc, broker_config: Arc, put_message_hook_list: Vec, - topic_config_table: Arc>>, + topic_config_table: Arc>>, commit_log: ArcMut, store_checkpoint: Option>, @@ -164,7 +166,7 @@ impl LocalFileMessageStore { pub fn new( message_store_config: Arc, broker_config: Arc, - topic_config_table: Arc>>, + topic_config_table: Arc>>, broker_stats_manager: Option>, notify_message_arrive_in_batch: bool, ) -> Self { @@ -316,11 +318,11 @@ impl Drop for LocalFileMessageStore { impl LocalFileMessageStore { #[inline] - pub fn get_topic_config(&self, topic: &CheetahString) -> Option { - if self.topic_config_table.lock().is_empty() { + pub fn get_topic_config(&self, topic: &CheetahString) -> Option> { + if self.topic_config_table.is_empty() { return None; } - self.topic_config_table.lock().get(topic).cloned() + self.topic_config_table.get(topic).as_deref().cloned() } fn is_temp_file_exist(&self) -> bool { @@ -788,7 +790,7 @@ impl MessageStore for LocalFileMessageStore { if MessageSysFlag::check(msg.sys_flag(), MessageSysFlag::INNER_BATCH_FLAG) { let topic_config = self.get_topic_config(msg.topic()); - if !QueueTypeUtils::is_batch_cq(topic_config.as_ref()) { + if !QueueTypeUtils::is_batch_cq_arc_mut(topic_config.as_ref()) { error!("[BUG]The message is an inner batch but cq type is not batch cq"); return PutMessageResult::new_default(PutMessageStatus::MessageIllegal); } @@ -883,7 +885,7 @@ impl MessageStore for LocalFileMessageStore { return None; } let topic_config = self.get_topic_config(topic); - let policy = get_delete_policy(topic_config.as_ref()); + let policy = get_delete_policy_arc_mut(topic_config.as_ref()); if policy == CleanupPolicy::COMPACTION && self.message_store_config.enable_compaction { //not implemented will be implemented in the future return self.compaction_store.get_message( diff --git a/rocketmq-store/src/queue/local_file_consume_queue_store.rs b/rocketmq-store/src/queue/local_file_consume_queue_store.rs index 5b5141f71..0658ebc57 100644 --- a/rocketmq-store/src/queue/local_file_consume_queue_store.rs +++ b/rocketmq-store/src/queue/local_file_consume_queue_store.rs @@ -427,7 +427,7 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore { let consume_queue = topic_map.entry(queue_id).or_insert_with(|| { let message_store = self.inner.message_store.as_ref().unwrap(); let option = message_store.get_topic_config(topic); - match QueueTypeUtils::get_cq_type(option.as_ref()) { + match QueueTypeUtils::get_cq_type_arc_mut(option.as_ref()) { CQType::SimpleCQ | CQType::RocksDBCQ => ArcMut::new(Box::new(ConsumeQueue::new( topic.clone(), queue_id, @@ -633,7 +633,7 @@ impl ConsumeQueueStore { .as_ref() .unwrap() .get_topic_config(topic); - let act = QueueTypeUtils::get_cq_type(topic_config.as_ref()); + let act = QueueTypeUtils::get_cq_type_arc_mut(topic_config.as_ref()); if act != cq_type { panic!("The queue type of topic: {topic} should be {cq_type:?}, but is {act:?}",); } diff --git a/rocketmq/Cargo.toml b/rocketmq/Cargo.toml index 1a73822cb..e385b582e 100644 --- a/rocketmq/Cargo.toml +++ b/rocketmq/Cargo.toml @@ -25,6 +25,7 @@ cron = "0.15" anyhow = { workspace = true } tokio-util = "0.7.16" parking_lot = { workspace = true } +serde = { workspace = true } [dev-dependencies] tokio-test = "0.4" diff --git a/rocketmq/src/arc_mut.rs b/rocketmq/src/arc_mut.rs index 8c5417ac0..8a9bf5213 100644 --- a/rocketmq/src/arc_mut.rs +++ b/rocketmq/src/arc_mut.rs @@ -28,6 +28,11 @@ use std::ops::DerefMut; use std::sync::Arc; use std::sync::Weak; +use serde::Deserialize; +use serde::Deserializer; +use serde::Serialize; +use serde::Serializer; + /// A weak version of `ArcMut` that doesn't prevent the inner value from being dropped. /// /// # Safety @@ -246,6 +251,32 @@ impl std::fmt::Debug for ArcMut { } } +impl Serialize for ArcMut +where + T: Serialize, +{ + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let inner_ref = unsafe { &*self.inner.get() }; + inner_ref.serialize(serializer) + } +} + +impl<'de, T> Deserialize<'de> for ArcMut +where + T: Deserialize<'de>, +{ + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let inner = T::deserialize(deserializer)?; + Ok(ArcMut::new(inner)) + } +} + #[cfg(test)] mod arc_cell_wrapper_tests { use std::sync::Arc;