Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 6 additions & 8 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,7 @@ impl BrokerRuntime {
}

impl<MS: MessageStore> BrokerRuntimeInner<MS> {
pub async fn register_single_topic_all(&self, topic_config: TopicConfig) {
pub async fn register_single_topic_all(&self, topic_config: ArcMut<TopicConfig>) {
let mut topic_config = topic_config;
if !PermName::is_writeable(self.broker_config.broker_permission)
|| !PermName::is_readable(self.broker_config.broker_permission)
Expand All @@ -1352,7 +1352,7 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {

pub async fn register_increment_broker_data(
this: ArcMut<BrokerRuntimeInner<MS>>,
topic_config_list: Vec<TopicConfig>,
topic_config_list: Vec<ArcMut<TopicConfig>>,
data_version: DataVersion,
) {
let mut serialize_wrapper = TopicConfigAndMappingSerializeWrapper {
Expand All @@ -1371,10 +1371,10 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
{
TopicConfig {
perm: topic_config.perm & this.broker_config().broker_permission,
..topic_config.clone()
..topic_config.as_ref().clone()
}
} else {
topic_config.clone()
topic_config.as_ref().clone()
};
topic_config_table.insert(
register_topic_config.topic_name.as_ref().unwrap().clone(),
Expand Down Expand Up @@ -1495,7 +1495,6 @@ impl<MS: MessageStore> StateGetter for ProducerStateGetter<MS> {
.broker_runtime_inner
.topic_config_manager()
.topic_config_table()
.lock()
.contains_key(NamespaceUtil::wrap_namespace(instance_id, topic).as_str())
{
self.broker_runtime_inner
Expand Down Expand Up @@ -1528,7 +1527,6 @@ impl<MS: MessageStore> StateGetter for ConsumerStateGetter<MS> {
.broker_runtime_inner
.topic_config_manager()
.topic_config_table()
.lock()
.contains_key(topic)
{
let topic_full_name =
Expand Down Expand Up @@ -2340,7 +2338,7 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
) {
let mut topic_config_table = HashMap::new();
let table = self.topic_config_manager().topic_config_table();
for topic_config in table.lock().values() {
for topic_config in table.iter() {
let new_topic_config = if !PermName::is_writeable(self.broker_config.broker_permission)
|| !PermName::is_readable(self.broker_config.broker_permission)
{
Expand All @@ -2352,7 +2350,7 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
..TopicConfig::default()
}
} else {
topic_config.clone()
topic_config.as_ref().clone()
};
topic_config_table.insert(
new_topic_config.topic_name.as_ref().unwrap().clone(),
Expand Down
9 changes: 4 additions & 5 deletions rocketmq-broker/src/hook/batch_check_before_put_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::collections::HashMap;
use std::sync::Arc;

use cheetah_string::CheetahString;
use dashmap::DashMap;
use rocketmq_common::common::config::TopicConfig;
use rocketmq_common::common::message::message_ext_broker_inner::MessageExtBrokerInner;
use rocketmq_common::common::message::MessageTrait;
use rocketmq_rust::ArcMut;
use rocketmq_store::base::message_result::PutMessageResult;
use rocketmq_store::hook::put_message_hook::PutMessageHook;
use tracing::warn;

use crate::util::hook_utils::HookUtils;

pub struct BatchCheckBeforePutMessageHook {
topic_config_table: Arc<parking_lot::Mutex<HashMap<CheetahString, TopicConfig>>>,
topic_config_table: Arc<DashMap<CheetahString, ArcMut<TopicConfig>>>,
}

impl BatchCheckBeforePutMessageHook {
pub fn new(
topic_config_table: Arc<parking_lot::Mutex<HashMap<CheetahString, TopicConfig>>>,
) -> Self {
pub fn new(topic_config_table: Arc<DashMap<CheetahString, ArcMut<TopicConfig>>>) -> Self {
Self { topic_config_table }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl<MS: MessageStore> ConsumerOrderInfoManager<MS> {
if *queue_id == topic_config.read_queue_nums as i32 {
queues_to_remove.push(*queue_id);
info!(
"Queue not exist, Clean order info, {}:{}, {}",
"Queue not exist, Clean order info, {}:{}, {:?}",
topic_at_group, order_info, topic_config
);
continue;
Expand Down
7 changes: 5 additions & 2 deletions rocketmq-broker/src/out_api/broker_outer_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,10 @@ impl BrokerOuterAPI {
}
}

fn create_request(broker_name: CheetahString, topic_config: TopicConfig) -> RemotingCommand {
fn create_request(
broker_name: CheetahString,
topic_config: ArcMut<TopicConfig>,
) -> RemotingCommand {
let request_header =
RegisterTopicRequestHeader::new(topic_config.topic_name.as_ref().cloned().unwrap());
let queue_data = QueueData::new(
Expand Down Expand Up @@ -328,7 +331,7 @@ impl BrokerOuterAPI {
pub async fn register_single_topic_all(
&self,
broker_name: CheetahString,
topic_config: TopicConfig,
topic_config: ArcMut<TopicConfig>,
timeout_mills: u64,
) {
let request = Self::create_request(broker_name, topic_config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl<MS: MessageStore> TopicRequestHandler<MS> {
}
};

let mut topic_config = TopicConfig {
let topic_config = ArcMut::new(TopicConfig {
topic_name: Some(topic.clone()),
read_queue_nums: request_header.read_queue_nums as u32,
write_queue_nums: request_header.write_queue_nums as u32,
Expand All @@ -137,7 +137,7 @@ impl<MS: MessageStore> TopicRequestHandler<MS> {
},
order: request_header.order,
attributes,
};
});
if topic_config.get_topic_message_type() == TopicMessageType::Mixed
&& !self
.broker_runtime_inner
Expand All @@ -154,10 +154,8 @@ impl<MS: MessageStore> TopicRequestHandler<MS> {
let topic_config_origin = self
.broker_runtime_inner
.topic_config_manager()
.topic_config_table()
.lock()
.get(&topic)
.cloned();
.get_topic_config(topic.as_str())
.clone();
Comment on lines +157 to +158
Copy link

Copilot AI Oct 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call chain .get_topic_config(topic.as_str()).clone() is redundant. The get_topic_config method already returns Option<ArcMut<TopicConfig>> which clones the ArcMut internally via as_deref().cloned(). The additional .clone() here creates an unnecessary extra reference count increment.

Suggested change
.get_topic_config(topic.as_str())
.clone();
.get_topic_config(topic.as_str());

Copilot uses AI. Check for mistakes.
if topic_config_origin.is_some() && topic_config == topic_config_origin.unwrap() {
info!(
"Broker receive request to update or create topic={}, but topicConfig has no \
Expand All @@ -169,7 +167,7 @@ impl<MS: MessageStore> TopicRequestHandler<MS> {
}
self.broker_runtime_inner
.topic_config_manager_mut()
.update_topic_config(&mut topic_config);
.update_topic_config(topic_config.clone());

if self
.broker_runtime_inner
Expand All @@ -179,7 +177,7 @@ impl<MS: MessageStore> TopicRequestHandler<MS> {
self.broker_runtime_inner
.topic_config_manager()
.broker_runtime_inner()
.register_single_topic_all(topic_config)
.register_single_topic_all(topic_config.clone())
.await;
} else {
/* self.broker_runtime_inner
Expand Down Expand Up @@ -257,10 +255,7 @@ impl<MS: MessageStore> TopicRequestHandler<MS> {
let topic_config_origin = self
.broker_runtime_inner
.topic_config_manager()
.topic_config_table()
.lock()
.get(topic)
.cloned();
.get_topic_config(topic);
if topic_config_origin.is_some() && topic_config.clone() == topic_config_origin.unwrap()
{
info!(
Expand Down Expand Up @@ -405,9 +400,7 @@ impl<MS: MessageStore> TopicRequestHandler<MS> {
topic_config_table: self
.broker_runtime_inner
.topic_config_manager()
.topic_config_table()
.lock()
.clone(),
.topic_config_table_hash_map(),
},
..Default::default()
};
Expand Down Expand Up @@ -553,8 +546,10 @@ impl<MS: MessageStore> TopicRequestHandler<MS> {
}
}
}
let topic_config_and_queue_mapping =
TopicConfigAndQueueMapping::new(topic_config.unwrap(), topic_queue_mapping_detail);
let topic_config_and_queue_mapping = TopicConfigAndQueueMapping::new(
topic_config.unwrap().try_unwrap().unwrap(),
topic_queue_mapping_detail,
);
response.set_body_mut_ref(
topic_config_and_queue_mapping
.encode()
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-broker/src/processor/notification_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl<MS: MessageStore> NotificationProcessor<MS> {

async fn has_msg_from_topic(
&self,
topic_config: Option<&TopicConfig>,
topic_config: Option<&ArcMut<TopicConfig>>,
random_q: i32,
request_header: &NotificationRequestHeader,
) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-broker/src/processor/pop_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ where
.await
} else {
self.pop_msg_from_queue(
&topic_config.topic_name.unwrap_or_default(),
&topic_config.topic_name.clone().unwrap_or_default(),
&request_header.attempt_id.clone().unwrap_or_default(),
false,
get_message_result.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,15 @@ impl<MS: MessageStore> PopReviveService<MS> {
{
return;
}
let mut topic_config = TopicConfig::new(topic.clone());
let mut topic_config = ArcMut::new(TopicConfig::new(topic.clone()));
topic_config.read_queue_nums = PopAckConstants::RETRY_QUEUE_NUM as u32;
topic_config.write_queue_nums = PopAckConstants::RETRY_QUEUE_NUM as u32;
topic_config.topic_filter_type = TopicFilterType::SingleTag;
topic_config.perm = 6;
topic_config.topic_sys_flag = 0;
self.broker_runtime_inner
.topic_config_manager_mut()
.update_topic_config(&mut topic_config);
.update_topic_config(topic_config.clone());
self.init_pop_retry_offset(topic, consumer_group);
}

Expand Down
4 changes: 2 additions & 2 deletions rocketmq-broker/src/processor/send_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ where
.message_ext_inner
.message,
);
if batch_uniq_id.is_some() && QueueTypeUtils::is_batch_cq(Some(topic_config).as_ref()) {
if batch_uniq_id.is_some() && QueueTypeUtils::is_batch_cq_arc_mut(Some(&topic_config)) {
let sys_flag = batch_message
.message_ext_broker_inner
.message_ext_inner
Expand Down Expand Up @@ -1027,7 +1027,7 @@ where
response: &mut RemotingCommand,
request: &RemotingCommand,
msg: &mut MessageExt,
topic_config: &mut rocketmq_common::common::config::TopicConfig,
topic_config: &mut ArcMut<rocketmq_common::common::config::TopicConfig>,
properties: &mut HashMap<CheetahString, CheetahString>,
) -> bool {
let mut new_topic = request_header.topic();
Expand Down
12 changes: 7 additions & 5 deletions rocketmq-broker/src/slave/slave_synchronize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,14 @@ where
.topic_config_manager()
.topic_config_table();

let mut topic_config_table = topic_config_table.lock();

// Delete entries not in new config
topic_config_table.retain(|key, _| new_topic_config_table.contains_key(key));

// Update with new entries
topic_config_table.extend(new_topic_config_table);
for (key, value) in new_topic_config_table.into_iter() {
topic_config_table.insert(key, ArcMut::new(value));
}

drop(topic_config_table);
self.broker_runtime_inner.topic_config_manager().persist();
}
Expand All @@ -180,12 +181,13 @@ where
.broker_runtime_inner
.topic_config_manager()
.topic_config_table();
let mut topic_config_table = topic_config_table.lock();
// Delete entries not in new config
topic_config_table.retain(|key, _| new_topic_config_table.contains_key(key));

// Update with new entries
topic_config_table.extend(new_topic_config_table);
for (key, value) in new_topic_config_table.into_iter() {
topic_config_table.insert(key, ArcMut::new(value));
}
drop(topic_config_table);
self.broker_runtime_inner
.topic_queue_mapping_manager()
Expand Down
Loading
Loading