Skip to content

Commit e4d65af

Browse files
authored
[ISSUE #1053] Support request code UNLOCK_BATCH_MQ(42) (#1062)
1 parent b089d1a commit e4d65af

File tree

3 files changed

+64
-2
lines changed

3 files changed

+64
-2
lines changed

rocketmq-broker/src/out_api/broker_outer_api.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use rocketmq_remoting::protocol::header::lock_batch_mq_request_header::LockBatch
3737
use rocketmq_remoting::protocol::header::namesrv::register_broker_header::RegisterBrokerRequestHeader;
3838
use rocketmq_remoting::protocol::header::namesrv::register_broker_header::RegisterBrokerResponseHeader;
3939
use rocketmq_remoting::protocol::header::namesrv::topic_operation_header::RegisterTopicRequestHeader;
40+
use rocketmq_remoting::protocol::header::unlock_batch_mq_request_header::UnlockBatchMqRequestHeader;
4041
use rocketmq_remoting::protocol::namesrv::RegisterBrokerResult;
4142
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
4243
use rocketmq_remoting::protocol::route::route_data_view::QueueData;
@@ -359,6 +360,37 @@ impl BrokerOuterAPI {
359360
Err(e) => Err(BrokerClientError(e)),
360361
}
361362
}
363+
364+
pub async fn unlock_batch_mq_async(
365+
&self,
366+
addr: String,
367+
request_body: bytes::Bytes,
368+
timeout_millis: u64,
369+
) -> Result<()> {
370+
let mut request = RemotingCommand::create_request_command(
371+
RequestCode::UnlockBatchMq,
372+
UnlockBatchMqRequestHeader::default(),
373+
);
374+
request.set_body_mut_ref(Some(request_body));
375+
let result = self
376+
.remoting_client
377+
.invoke_async(Some(addr), request, timeout_millis)
378+
.await;
379+
match result {
380+
Ok(response) => {
381+
if ResponseCode::from(response.code()) == ResponseCode::Success {
382+
Ok(())
383+
} else {
384+
Err(BrokerError::MQBrokerError(
385+
response.code(),
386+
response.remark().cloned().unwrap_or("".to_string()),
387+
"".to_string(),
388+
))
389+
}
390+
}
391+
Err(e) => Err(BrokerClientError(e)),
392+
}
393+
}
362394
}
363395

364396
fn dns_lookup_address_by_domain(domain: &str) -> Vec<String> {

rocketmq-broker/src/processor/admin_broker_processor.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,11 @@ impl AdminBrokerProcessor {
209209
.await
210210
}
211211

212+
RequestCode::UnlockBatchMq => {
213+
self.batch_mq_handler
214+
.unlock_batch_mq(channel, ctx, request_code, request)
215+
.await
216+
}
212217
_ => Some(get_unknown_cmd_response(request_code)),
213218
}
214219
}

rocketmq-broker/src/processor/admin_broker_processor/batch_mq_handler.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use rocketmq_remoting::code::request_code::RequestCode;
2424
use rocketmq_remoting::net::channel::Channel;
2525
use rocketmq_remoting::protocol::body::request::lock_batch_request_body::LockBatchRequestBody;
2626
use rocketmq_remoting::protocol::body::response::lock_batch_response_body::LockBatchResponseBody;
27+
use rocketmq_remoting::protocol::body::unlock_batch_request_body::UnlockBatchRequestBody;
2728
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
2829
use rocketmq_remoting::protocol::RemotingDeserializable;
2930
use rocketmq_remoting::protocol::RemotingSerializable;
@@ -122,8 +123,32 @@ impl BatchMqHandler {
122123
_channel: Channel,
123124
_ctx: ConnectionHandlerContext,
124125
_request_code: RequestCode,
125-
_request: RemotingCommand,
126+
request: RemotingCommand,
126127
) -> Option<RemotingCommand> {
127-
unimplemented!("unlockBatchMQ")
128+
let mut request_body = UnlockBatchRequestBody::decode(request.get_body().unwrap()).unwrap();
129+
if request_body.only_this_broker || !self.inner.broker_config.lock_in_strict_mode {
130+
self.inner.rebalance_lock_manager.unlock_batch(
131+
request_body.consumer_group.as_ref().unwrap(),
132+
&request_body.mq_set,
133+
request_body.client_id.as_ref().unwrap(),
134+
);
135+
} else {
136+
request_body.only_this_broker = true;
137+
let request_body = Bytes::from(request_body.encode());
138+
for broker_addr in self.inner.broker_member_group.broker_addrs.values() {
139+
match self
140+
.inner
141+
.broker_out_api
142+
.unlock_batch_mq_async(broker_addr.clone(), request_body.clone(), 1000)
143+
.await
144+
{
145+
Ok(_) => {}
146+
Err(e) => {
147+
warn!("unlockBatchMQ exception on {}, {}", broker_addr, e);
148+
}
149+
}
150+
}
151+
}
152+
Some(RemotingCommand::create_response_command())
128153
}
129154
}

0 commit comments

Comments
 (0)