Skip to content

Commit 92cbef7

Browse files
committed
[ISSUE ##1052] Support request code LOCK_BATCH_MQ(41)
1 parent f093729 commit 92cbef7

File tree

3 files changed

+14
-10
lines changed

3 files changed

+14
-10
lines changed

rocketmq-broker/src/out_api/broker_outer_api.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ use rocketmq_remoting::code::request_code::RequestCode;
3131
use rocketmq_remoting::code::response_code::ResponseCode;
3232
use rocketmq_remoting::protocol::body::broker_body::register_broker_body::RegisterBrokerBody;
3333
use rocketmq_remoting::protocol::body::kv_table::KVTable;
34-
use rocketmq_remoting::protocol::body::request::lock_batch_request_body::LockBatchRequestBody;
3534
use rocketmq_remoting::protocol::body::response::lock_batch_response_body::LockBatchResponseBody;
3635
use rocketmq_remoting::protocol::body::topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper;
3736
use rocketmq_remoting::protocol::header::lock_batch_mq_request_header::LockBatchMqRequestHeader;
@@ -351,7 +350,7 @@ impl BrokerOuterAPI {
351350
Ok(lock_batch_response_body.lock_ok_mq_set)
352351
} else {
353352
Err(BrokerError::MQBrokerError(
354-
response.code() as i32,
353+
response.code(),
355354
response.remark().cloned().unwrap_or("".to_string()),
356355
"".to_string(),
357356
))

rocketmq-broker/src/processor/admin_broker_processor/batch_mq_handler.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use bytes::Bytes;
2323
use rocketmq_remoting::code::request_code::RequestCode;
2424
use rocketmq_remoting::net::channel::Channel;
2525
use rocketmq_remoting::protocol::body::request::lock_batch_request_body::LockBatchRequestBody;
26+
use rocketmq_remoting::protocol::body::response::lock_batch_response_body::LockBatchResponseBody;
2627
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
2728
use rocketmq_remoting::protocol::RemotingDeserializable;
2829
use rocketmq_remoting::protocol::RemotingSerializable;
@@ -74,18 +75,17 @@ impl BatchMqHandler {
7475
addr_map.extend(self.inner.broker_member_group.broker_addrs.clone());
7576
addr_map.remove(&self.inner.broker_config.broker_identity.broker_id);
7677

77-
let mut tasks = vec![];
78-
7978
let count_down_latch = CountDownLatch::new(addr_map.len() as u32);
8079
let request_body = Bytes::from(request_body.encode());
8180
let mq_lock_map_arc = Arc::new(Mutex::new(mq_lock_map.clone()));
8281
for (_, broker_addr) in addr_map {
8382
let count_down_latch = count_down_latch.clone();
8483
let broker_outer_api = self.inner.broker_out_api.clone();
8584
let mq_lock_map = mq_lock_map_arc.clone();
86-
tasks.push(tokio::spawn(async move {
85+
let request_body_cloned = request_body.clone();
86+
tokio::spawn(async move {
8787
let result = broker_outer_api
88-
.lock_batch_mq_async(broker_addr, request_body.clone(), 1000)
88+
.lock_batch_mq_async(broker_addr, request_body_cloned, 1000)
8989
.await;
9090
match result {
9191
Ok(lock_ok_mqs) => {
@@ -99,7 +99,7 @@ impl BatchMqHandler {
9999
}
100100
}
101101
count_down_latch.count_down().await;
102-
}));
102+
});
103103
}
104104
count_down_latch.wait_timeout(Duration::from_secs(2)).await;
105105

@@ -111,15 +111,18 @@ impl BatchMqHandler {
111111
}
112112
}
113113
}
114-
unimplemented!("lock_natch_mq")
114+
let response_body = LockBatchResponseBody {
115+
lock_ok_mq_set: lock_ok_mqset,
116+
};
117+
Some(RemotingCommand::create_response_command().set_body(Some(response_body.encode())))
115118
}
116119

117120
pub async fn unlock_batch_mq(
118121
&mut self,
119122
_channel: Channel,
120123
_ctx: ConnectionHandlerContext,
121124
_request_code: RequestCode,
122-
request: RemotingCommand,
125+
_request: RemotingCommand,
123126
) -> Option<RemotingCommand> {
124127
unimplemented!("unlockBatchMQ")
125128
}

rocketmq-namesrv/src/route/route_info_manager.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,9 @@ impl RouteInfoManager {
652652
BrokerMemberGroup::new(cluster_name.to_string(), broker_name.to_string());
653653
if let Some(broker_data) = self.broker_addr_table.get(broker_name) {
654654
let map = broker_data.broker_addrs().clone();
655-
group_member.broker_addrs.extend(map);
655+
for (key, value) in map {
656+
group_member.broker_addrs.insert(key as u64, value);
657+
}
656658
}
657659
Some(group_member)
658660
}

0 commit comments

Comments
 (0)