Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 6 additions & 8 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,7 @@ impl BrokerRuntime {
}

impl<MS: MessageStore> BrokerRuntimeInner<MS> {
pub async fn register_single_topic_all(&self, topic_config: TopicConfig) {
pub async fn register_single_topic_all(&self, topic_config: ArcMut<TopicConfig>) {
let mut topic_config = topic_config;
if !PermName::is_writeable(self.broker_config.broker_permission)
|| !PermName::is_readable(self.broker_config.broker_permission)
Expand All @@ -1352,7 +1352,7 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {

pub async fn register_increment_broker_data(
this: ArcMut<BrokerRuntimeInner<MS>>,
topic_config_list: Vec<TopicConfig>,
topic_config_list: Vec<ArcMut<TopicConfig>>,
data_version: DataVersion,
) {
let mut serialize_wrapper = TopicConfigAndMappingSerializeWrapper {
Expand All @@ -1371,10 +1371,10 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
{
TopicConfig {
perm: topic_config.perm & this.broker_config().broker_permission,
..topic_config.clone()
..topic_config.as_ref().clone()
}
} else {
topic_config.clone()
topic_config.as_ref().clone()
};
topic_config_table.insert(
register_topic_config.topic_name.as_ref().unwrap().clone(),
Expand Down Expand Up @@ -1495,7 +1495,6 @@ impl<MS: MessageStore> StateGetter for ProducerStateGetter<MS> {
.broker_runtime_inner
.topic_config_manager()
.topic_config_table()
.lock()
.contains_key(NamespaceUtil::wrap_namespace(instance_id, topic).as_str())
{
self.broker_runtime_inner
Expand Down Expand Up @@ -1528,7 +1527,6 @@ impl<MS: MessageStore> StateGetter for ConsumerStateGetter<MS> {
.broker_runtime_inner
.topic_config_manager()
.topic_config_table()
.lock()
.contains_key(topic)
{
let topic_full_name =
Expand Down Expand Up @@ -2340,7 +2338,7 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
) {
let mut topic_config_table = HashMap::new();
let table = self.topic_config_manager().topic_config_table();
for topic_config in table.lock().values() {
for topic_config in table.iter() {
let new_topic_config = if !PermName::is_writeable(self.broker_config.broker_permission)
|| !PermName::is_readable(self.broker_config.broker_permission)
{
Expand All @@ -2352,7 +2350,7 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
..TopicConfig::default()
}
} else {
topic_config.clone()
topic_config.as_ref().clone()
};
topic_config_table.insert(
new_topic_config.topic_name.as_ref().unwrap().clone(),
Expand Down
9 changes: 4 additions & 5 deletions rocketmq-broker/src/hook/batch_check_before_put_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::collections::HashMap;
use std::sync::Arc;

use cheetah_string::CheetahString;
use dashmap::DashMap;
use rocketmq_common::common::config::TopicConfig;
use rocketmq_common::common::message::message_ext_broker_inner::MessageExtBrokerInner;
use rocketmq_common::common::message::MessageTrait;
use rocketmq_rust::ArcMut;
use rocketmq_store::base::message_result::PutMessageResult;
use rocketmq_store::hook::put_message_hook::PutMessageHook;
use tracing::warn;

use crate::util::hook_utils::HookUtils;

pub struct BatchCheckBeforePutMessageHook {
topic_config_table: Arc<parking_lot::Mutex<HashMap<CheetahString, TopicConfig>>>,
topic_config_table: Arc<DashMap<CheetahString, ArcMut<TopicConfig>>>,
}

impl BatchCheckBeforePutMessageHook {
pub fn new(
topic_config_table: Arc<parking_lot::Mutex<HashMap<CheetahString, TopicConfig>>>,
) -> Self {
pub fn new(topic_config_table: Arc<DashMap<CheetahString, ArcMut<TopicConfig>>>) -> Self {
Self { topic_config_table }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl<MS: MessageStore> ConsumerOrderInfoManager<MS> {
if *queue_id == topic_config.read_queue_nums as i32 {
queues_to_remove.push(*queue_id);
info!(
"Queue not exist, Clean order info, {}:{}, {}",
"Queue not exist, Clean order info, {}:{}, {:?}",
topic_at_group, order_info, topic_config
);
continue;
Expand Down
7 changes: 5 additions & 2 deletions rocketmq-broker/src/out_api/broker_outer_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,10 @@ impl BrokerOuterAPI {
}
}

fn create_request(broker_name: CheetahString, topic_config: TopicConfig) -> RemotingCommand {
fn create_request(
broker_name: CheetahString,
topic_config: ArcMut<TopicConfig>,
) -> RemotingCommand {
let request_header =
RegisterTopicRequestHeader::new(topic_config.topic_name.as_ref().cloned().unwrap());
let queue_data = QueueData::new(
Expand Down Expand Up @@ -328,7 +331,7 @@ impl BrokerOuterAPI {
pub async fn register_single_topic_all(
&self,
broker_name: CheetahString,
topic_config: TopicConfig,
topic_config: ArcMut<TopicConfig>,
timeout_mills: u64,
) {
let request = Self::create_request(broker_name, topic_config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl<MS: MessageStore> TopicRequestHandler<MS> {
}
};

let mut topic_config = TopicConfig {
let topic_config = ArcMut::new(TopicConfig {
topic_name: Some(topic.clone()),
read_queue_nums: request_header.read_queue_nums as u32,
write_queue_nums: request_header.write_queue_nums as u32,
Expand All @@ -137,7 +137,7 @@ impl<MS: MessageStore> TopicRequestHandler<MS> {
},
order: request_header.order,
attributes,
};
});
if topic_config.get_topic_message_type() == TopicMessageType::Mixed
&& !self
.broker_runtime_inner
Expand All @@ -154,10 +154,8 @@ impl<MS: MessageStore> TopicRequestHandler<MS> {
let topic_config_origin = self
.broker_runtime_inner
.topic_config_manager()
.topic_config_table()
.lock()
.get(&topic)
.cloned();
.get_topic_config(topic.as_str())
.clone();
Comment on lines +157 to +158
Copy link

Copilot AI Oct 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call chain .get_topic_config(topic.as_str()).clone() is redundant. The get_topic_config method already returns Option<ArcMut<TopicConfig>> which clones the ArcMut internally via as_deref().cloned(). The additional .clone() here creates an unnecessary extra reference count increment.

Suggested change
.get_topic_config(topic.as_str())
.clone();
.get_topic_config(topic.as_str());

Copilot uses AI. Check for mistakes.
if topic_config_origin.is_some() && topic_config == topic_config_origin.unwrap() {
info!(
"Broker receive request to update or create topic={}, but topicConfig has no \
Expand All @@ -169,7 +167,7 @@ impl<MS: MessageStore> TopicRequestHandler<MS> {
}
self.broker_runtime_inner
.topic_config_manager_mut()
.update_topic_config(&mut topic_config);
.update_topic_config(topic_config.clone());

if self
.broker_runtime_inner
Expand All @@ -179,7 +177,7 @@ impl<MS: MessageStore> TopicRequestHandler<MS> {
self.broker_runtime_inner
.topic_config_manager()
.broker_runtime_inner()
.register_single_topic_all(topic_config)
.register_single_topic_all(topic_config.clone())
.await;
} else {
/* self.broker_runtime_inner
Expand Down Expand Up @@ -257,10 +255,7 @@ impl<MS: MessageStore> TopicRequestHandler<MS> {
let topic_config_origin = self
.broker_runtime_inner
.topic_config_manager()
.topic_config_table()
.lock()
.get(topic)
.cloned();
.get_topic_config(topic);
if topic_config_origin.is_some() && topic_config.clone() == topic_config_origin.unwrap()
{
info!(
Expand Down Expand Up @@ -405,9 +400,7 @@ impl<MS: MessageStore> TopicRequestHandler<MS> {
topic_config_table: self
.broker_runtime_inner
.topic_config_manager()
.topic_config_table()
.lock()
.clone(),
.topic_config_table_hash_map(),
},
..Default::default()
};
Expand Down Expand Up @@ -553,8 +546,10 @@ impl<MS: MessageStore> TopicRequestHandler<MS> {
}
}
}
let topic_config_and_queue_mapping =
TopicConfigAndQueueMapping::new(topic_config.unwrap(), topic_queue_mapping_detail);
let topic_config_and_queue_mapping = TopicConfigAndQueueMapping::new(
topic_config.unwrap().try_unwrap().unwrap(),
topic_queue_mapping_detail,
);
response.set_body_mut_ref(
topic_config_and_queue_mapping
.encode()
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-broker/src/processor/notification_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl<MS: MessageStore> NotificationProcessor<MS> {

async fn has_msg_from_topic(
&self,
topic_config: Option<&TopicConfig>,
topic_config: Option<&ArcMut<TopicConfig>>,
random_q: i32,
request_header: &NotificationRequestHeader,
) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-broker/src/processor/pop_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ where
.await
} else {
self.pop_msg_from_queue(
&topic_config.topic_name.unwrap_or_default(),
&topic_config.topic_name.clone().unwrap_or_default(),
&request_header.attempt_id.clone().unwrap_or_default(),
false,
get_message_result.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,15 @@ impl<MS: MessageStore> PopReviveService<MS> {
{
return;
}
let mut topic_config = TopicConfig::new(topic.clone());
let mut topic_config = ArcMut::new(TopicConfig::new(topic.clone()));
topic_config.read_queue_nums = PopAckConstants::RETRY_QUEUE_NUM as u32;
topic_config.write_queue_nums = PopAckConstants::RETRY_QUEUE_NUM as u32;
topic_config.topic_filter_type = TopicFilterType::SingleTag;
topic_config.perm = 6;
topic_config.topic_sys_flag = 0;
self.broker_runtime_inner
.topic_config_manager_mut()
.update_topic_config(&mut topic_config);
.update_topic_config(topic_config.clone());
self.init_pop_retry_offset(topic, consumer_group);
}

Expand Down
4 changes: 2 additions & 2 deletions rocketmq-broker/src/processor/send_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ where
.message_ext_inner
.message,
);
if batch_uniq_id.is_some() && QueueTypeUtils::is_batch_cq(Some(topic_config).as_ref()) {
if batch_uniq_id.is_some() && QueueTypeUtils::is_batch_cq_arc_mut(Some(&topic_config)) {
let sys_flag = batch_message
.message_ext_broker_inner
.message_ext_inner
Expand Down Expand Up @@ -1027,7 +1027,7 @@ where
response: &mut RemotingCommand,
request: &RemotingCommand,
msg: &mut MessageExt,
topic_config: &mut rocketmq_common::common::config::TopicConfig,
topic_config: &mut ArcMut<rocketmq_common::common::config::TopicConfig>,
properties: &mut HashMap<CheetahString, CheetahString>,
) -> bool {
let mut new_topic = request_header.topic();
Expand Down
12 changes: 7 additions & 5 deletions rocketmq-broker/src/slave/slave_synchronize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,14 @@ where
.topic_config_manager()
.topic_config_table();

let mut topic_config_table = topic_config_table.lock();

// Delete entries not in new config
topic_config_table.retain(|key, _| new_topic_config_table.contains_key(key));

// Update with new entries
topic_config_table.extend(new_topic_config_table);
for (key, value) in new_topic_config_table.into_iter() {
topic_config_table.insert(key, ArcMut::new(value));
}

drop(topic_config_table);
self.broker_runtime_inner.topic_config_manager().persist();
}
Expand All @@ -180,12 +181,13 @@ where
.broker_runtime_inner
.topic_config_manager()
.topic_config_table();
let mut topic_config_table = topic_config_table.lock();
// Delete entries not in new config
topic_config_table.retain(|key, _| new_topic_config_table.contains_key(key));

// Update with new entries
topic_config_table.extend(new_topic_config_table);
for (key, value) in new_topic_config_table.into_iter() {
topic_config_table.insert(key, ArcMut::new(value));
}
drop(topic_config_table);
self.broker_runtime_inner
.topic_queue_mapping_manager()
Expand Down
Loading
Loading