Skip to content

Commit b089d1a

Browse files
authored
[ISSUE ##1052] Support request code LOCK_BATCH_MQ(41) (#1061)
1 parent 976a33b commit b089d1a

File tree

13 files changed

+271
-54
lines changed

13 files changed

+271
-54
lines changed

rocketmq-broker/src/broker_runtime.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use rocketmq_common::common::statistics::state_getter::StateGetter;
3030
use rocketmq_common::ArcRefCellWrapper;
3131
use rocketmq_common::TimeUtils::get_current_millis;
3232
use rocketmq_common::UtilAll::compute_next_morning_time_millis;
33+
use rocketmq_remoting::protocol::body::broker_body::broker_member_group::BrokerMemberGroup;
3334
use rocketmq_remoting::protocol::body::topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper;
3435
use rocketmq_remoting::protocol::body::topic_info_wrapper::topic_config_wrapper::TopicConfigSerializeWrapper;
3536
use rocketmq_remoting::protocol::namespace_util::NamespaceUtil;
@@ -114,6 +115,7 @@ pub(crate) struct BrokerRuntime {
114115
#[cfg(feature = "local_file_store")]
115116
pull_request_hold_service: Option<PullRequestHoldService<DefaultMessageStore>>,
116117
rebalance_lock_manager: Arc<RebalanceLockManager>,
118+
broker_member_group: Arc<BrokerMemberGroup>,
117119
}
118120

119121
impl Clone for BrokerRuntime {
@@ -147,6 +149,7 @@ impl Clone for BrokerRuntime {
147149
is_isolated: self.is_isolated.clone(),
148150
pull_request_hold_service: self.pull_request_hold_service.clone(),
149151
rebalance_lock_manager: self.rebalance_lock_manager.clone(),
152+
broker_member_group: self.broker_member_group.clone(),
150153
}
151154
}
152155
}
@@ -190,6 +193,14 @@ impl BrokerRuntime {
190193
}));
191194
let broker_stats_manager = Arc::new(stats_manager);
192195
consumer_manager.set_broker_stats_manager(Some(Arc::downgrade(&broker_stats_manager)));
196+
let mut broker_member_group = BrokerMemberGroup::new(
197+
broker_config.broker_identity.broker_cluster_name.clone(),
198+
broker_config.broker_identity.broker_name.clone(),
199+
);
200+
broker_member_group.broker_addrs.insert(
201+
broker_config.broker_identity.broker_id,
202+
broker_config.get_broker_addr(),
203+
);
193204
Self {
194205
broker_config: broker_config.clone(),
195206
message_store_config,
@@ -222,6 +233,7 @@ impl BrokerRuntime {
222233
is_isolated: Arc::new(AtomicBool::new(false)),
223234
pull_request_hold_service: None,
224235
rebalance_lock_manager: Arc::new(Default::default()),
236+
broker_member_group: Arc::new(broker_member_group),
225237
}
226238
}
227239

@@ -471,6 +483,8 @@ impl BrokerRuntime {
471483
self.consumer_manager.clone(),
472484
self.broker_out_api.clone(),
473485
self.broker_stats_manager.clone(),
486+
self.rebalance_lock_manager.clone(),
487+
self.broker_member_group.clone(),
474488
);
475489

476490
BrokerRequestProcessor {

rocketmq-broker/src/client/net/broker_to_client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use rocketmq_remoting::net::channel::Channel;
1818
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
1919

2020
use crate::error::BrokerError::BrokerClientError;
21-
use crate::BrokerResult;
21+
use crate::Result;
2222

2323
#[derive(Default, Clone)]
2424
pub struct Broker2Client;
@@ -29,7 +29,7 @@ impl Broker2Client {
2929
channel: &mut Channel,
3030
request: RemotingCommand,
3131
timeout_millis: u64,
32-
) -> BrokerResult<RemotingCommand> {
32+
) -> Result<RemotingCommand> {
3333
match channel.send_wait_response(request, timeout_millis).await {
3434
Ok(value) => Ok(value),
3535
Err(e) => Err(BrokerClientError(e)),

rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs

Lines changed: 46 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* limitations under the License.
1616
*/
1717
use std::collections::HashMap;
18+
use std::collections::HashSet;
1819
use std::sync::atomic::AtomicI64;
1920
use std::sync::Arc;
2021

@@ -34,7 +35,7 @@ lazy_static! {
3435
};
3536
}
3637

37-
type MessageQueueLockTable = HashMap<String, HashMap<Arc<MessageQueue>, LockEntry>>;
38+
type MessageQueueLockTable = HashMap<String, HashMap<MessageQueue, LockEntry>>;
3839

3940
#[derive(Clone, Default)]
4041
pub struct RebalanceLockManager {
@@ -60,26 +61,25 @@ impl RebalanceLockManager {
6061
pub fn try_lock_batch(
6162
&self,
6263
group: &str,
63-
mqs: Vec<Arc<MessageQueue>>,
64+
mqs: &HashSet<MessageQueue>,
6465
client_id: &str,
65-
) -> Vec<Arc<MessageQueue>> {
66-
let mut lock_mqs = Vec::new();
67-
let mut not_locked_mqs = Vec::new();
66+
) -> HashSet<MessageQueue> {
67+
let mut lock_mqs = HashSet::with_capacity(mqs.len());
68+
let mut not_locked_mqs = HashSet::with_capacity(mqs.len());
6869
for mq in mqs.iter() {
6970
if self.is_locked(group, mq, client_id) {
70-
lock_mqs.push(mq.clone());
71+
lock_mqs.insert(mq.clone());
7172
} else {
72-
not_locked_mqs.push(mq.clone());
73+
not_locked_mqs.insert(mq.clone());
7374
}
7475
}
7576
if !not_locked_mqs.is_empty() {
7677
let mut write_guard = self.mq_lock_table.write();
77-
let mut group_value = write_guard.get_mut(group);
78-
if group_value.is_none() {
79-
group_value = Some(write_guard.entry(group.to_string()).or_default());
80-
}
81-
let group_value = group_value.unwrap();
82-
for mq in not_locked_mqs.iter() {
78+
let group_value = write_guard
79+
.entry(group.to_string())
80+
.or_insert(HashMap::with_capacity(32));
81+
82+
for mq in not_locked_mqs {
8383
let lock_entry = group_value.entry(mq.clone()).or_insert_with(|| {
8484
info!(
8585
"RebalanceLockManager#tryLockBatch: lock a message which has not been \
@@ -96,7 +96,7 @@ impl RebalanceLockManager {
9696
get_current_millis() as i64,
9797
std::sync::atomic::Ordering::Relaxed,
9898
);
99-
lock_mqs.push(mq.clone());
99+
lock_mqs.insert(mq);
100100
continue;
101101
}
102102
let old_client_id = lock_entry.client_id.as_str().to_string();
@@ -106,12 +106,12 @@ impl RebalanceLockManager {
106106
get_current_millis() as i64,
107107
std::sync::atomic::Ordering::Relaxed,
108108
);
109-
lock_mqs.push(mq.clone());
110109
warn!(
111110
"RebalanceLockManager#tryLockBatch: try to lock a expired message queue, \
112111
group={} mq={:?}, old client id={}, new client id={}",
113112
group, mq, old_client_id, client_id
114113
);
114+
lock_mqs.insert(mq);
115115
continue;
116116
}
117117
warn!(
@@ -124,7 +124,7 @@ impl RebalanceLockManager {
124124
lock_mqs
125125
}
126126

127-
pub fn unlock_batch(&self, group: &str, mqs: Vec<Arc<MessageQueue>>, client_id: &str) {
127+
pub fn unlock_batch(&self, group: &str, mqs: &HashSet<MessageQueue>, client_id: &str) {
128128
let mut write_guard = self.mq_lock_table.write();
129129
let group_value = write_guard.get_mut(group);
130130
if group_value.is_none() {
@@ -163,7 +163,7 @@ impl RebalanceLockManager {
163163
}
164164
}
165165

166-
fn is_locked(&self, group: &str, mq: &Arc<MessageQueue>, client_id: &str) -> bool {
166+
fn is_locked(&self, group: &str, mq: &MessageQueue, client_id: &str) -> bool {
167167
let lock_table = self.mq_lock_table.read();
168168
let group_value = lock_table.get(group);
169169
if group_value.is_none() {
@@ -231,59 +231,71 @@ mod rebalance_lock_manager_tests {
231231
#[test]
232232
fn lock_all_expired_returns_false_when_active_locks_exist() {
233233
let manager = RebalanceLockManager::default();
234-
let mq = Arc::new(MessageQueue::default());
235-
manager.try_lock_batch("test_group", vec![mq.clone()], "client_1");
234+
let mq = MessageQueue::default();
235+
let mut set = HashSet::new();
236+
set.insert(mq.clone());
237+
manager.try_lock_batch("test_group", &set, "client_1");
236238
assert!(!manager.is_lock_all_expired("test_group"));
237239
}
238240

239241
#[test]
240242
fn try_lock_batch_locks_message_queues_for_new_group() {
241243
let manager = RebalanceLockManager::default();
242-
let mq = Arc::new(MessageQueue::default());
243-
let locked_mqs = manager.try_lock_batch("test_group", vec![mq.clone()], "client_1");
244+
let mq = MessageQueue::default();
245+
let mut set = HashSet::new();
246+
set.insert(mq.clone());
247+
let locked_mqs = manager.try_lock_batch("test_group", &set, "client_1");
244248
assert_eq!(locked_mqs.len(), 1);
245249
}
246250

247251
#[test]
248252
fn try_lock_batch_does_not_lock_already_locked_message_queues() {
249253
let manager = RebalanceLockManager::default();
250-
let mq = Arc::new(MessageQueue::default());
251-
manager.try_lock_batch("test_group", vec![mq.clone()], "client_1");
252-
let locked_mqs = manager.try_lock_batch("test_group", vec![mq.clone()], "client_2");
254+
let mq = MessageQueue::default();
255+
let mut set = HashSet::new();
256+
set.insert(mq.clone());
257+
manager.try_lock_batch("test_group", &set, "client_1");
258+
let locked_mqs = manager.try_lock_batch("test_group", &set, "client_2");
253259
assert!(locked_mqs.is_empty());
254260
}
255261

256262
#[test]
257263
fn unlock_batch_unlocks_message_queues_locked_by_client() {
258264
let manager = RebalanceLockManager::default();
259-
let mq = Arc::new(MessageQueue::default());
260-
manager.try_lock_batch("test_group", vec![mq.clone()], "client_1");
261-
manager.unlock_batch("test_group", vec![mq.clone()], "client_1");
262-
let locked_mqs = manager.try_lock_batch("test_group", vec![mq.clone()], "client_2");
265+
let mq = MessageQueue::default();
266+
let mut set = HashSet::new();
267+
set.insert(mq.clone());
268+
manager.try_lock_batch("test_group", &set, "client_1");
269+
manager.unlock_batch("test_group", &set, "client_1");
270+
let locked_mqs = manager.try_lock_batch("test_group", &set, "client_2");
263271
assert_eq!(locked_mqs.len(), 1);
264272
}
265273

266274
#[test]
267275
fn unlock_batch_does_not_unlock_message_queues_locked_by_other_clients() {
268276
let manager = RebalanceLockManager::default();
269-
let mq = Arc::new(MessageQueue::default());
270-
manager.try_lock_batch("test_group", vec![mq.clone()], "client_1");
271-
manager.unlock_batch("test_group", vec![mq.clone()], "client_2");
277+
let mq = MessageQueue::default();
278+
let mut set = HashSet::new();
279+
set.insert(mq.clone());
280+
manager.try_lock_batch("test_group", &set, "client_1");
281+
manager.unlock_batch("test_group", &set, "client_2");
272282
assert!(!manager.is_lock_all_expired("test_group"));
273283
}
274284

275285
#[test]
276286
fn is_locked_returns_true_for_locked_message_queue() {
277287
let manager = RebalanceLockManager::default();
278-
let mq = Arc::new(MessageQueue::default());
279-
manager.try_lock_batch("test_group", vec![mq.clone()], "client_1");
288+
let mq = MessageQueue::default();
289+
let mut set = HashSet::new();
290+
set.insert(mq.clone());
291+
manager.try_lock_batch("test_group", &set, "client_1");
280292
assert!(manager.is_locked("test_group", &mq, "client_1"));
281293
}
282294

283295
#[test]
284296
fn is_locked_returns_false_for_unlocked_message_queue() {
285297
let manager = RebalanceLockManager::default();
286-
let mq = Arc::new(MessageQueue::default());
298+
let mq = MessageQueue::default();
287299
assert!(!manager.is_locked("test_group", &mq, "client_1"));
288300
}
289301
}

rocketmq-broker/src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,7 @@ use thiserror::Error;
2020
pub enum BrokerError {
2121
#[error("broker client error: {0}")]
2222
BrokerClientError(#[from] rocketmq_remoting::error::Error),
23+
24+
#[error("Client exception occurred: CODE:{0}, broker address:{2}, Message:{1}")]
25+
MQBrokerError(i32, String, String),
2326
}

rocketmq-broker/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,4 @@ pub(crate) mod topic;
4747
pub(crate) mod util;
4848

4949
type RemotingError = rocketmq_remoting::error::Error;
50-
type BrokerResult<T> = Result<T, BrokerError>;
50+
type Result<T> = std::result::Result<T, BrokerError>;

rocketmq-broker/src/out_api/broker_outer_api.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
use std::collections::HashSet;
1718
use std::sync::Arc;
1819
use std::sync::Weak;
1920

2021
use dns_lookup::lookup_host;
2122
use rocketmq_common::common::broker::broker_config::BrokerIdentity;
2223
use rocketmq_common::common::config::TopicConfig;
24+
use rocketmq_common::common::message::message_queue::MessageQueue;
2325
use rocketmq_common::utils::crc32_utils;
2426
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
2527
use rocketmq_common::ArcRefCellWrapper;
@@ -29,14 +31,17 @@ use rocketmq_remoting::code::request_code::RequestCode;
2931
use rocketmq_remoting::code::response_code::ResponseCode;
3032
use rocketmq_remoting::protocol::body::broker_body::register_broker_body::RegisterBrokerBody;
3133
use rocketmq_remoting::protocol::body::kv_table::KVTable;
34+
use rocketmq_remoting::protocol::body::response::lock_batch_response_body::LockBatchResponseBody;
3235
use rocketmq_remoting::protocol::body::topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper;
36+
use rocketmq_remoting::protocol::header::lock_batch_mq_request_header::LockBatchMqRequestHeader;
3337
use rocketmq_remoting::protocol::header::namesrv::register_broker_header::RegisterBrokerRequestHeader;
3438
use rocketmq_remoting::protocol::header::namesrv::register_broker_header::RegisterBrokerResponseHeader;
3539
use rocketmq_remoting::protocol::header::namesrv::topic_operation_header::RegisterTopicRequestHeader;
3640
use rocketmq_remoting::protocol::namesrv::RegisterBrokerResult;
3741
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
3842
use rocketmq_remoting::protocol::route::route_data_view::QueueData;
3943
use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData;
44+
use rocketmq_remoting::protocol::RemotingDeserializable;
4045
use rocketmq_remoting::protocol::RemotingSerializable;
4146
use rocketmq_remoting::remoting::RemotingService;
4247
use rocketmq_remoting::request_processor::default_request_processor::DefaultRemotingRequestProcessor;
@@ -48,6 +53,10 @@ use tracing::debug;
4853
use tracing::error;
4954
use tracing::info;
5055

56+
use crate::error::BrokerError;
57+
use crate::error::BrokerError::BrokerClientError;
58+
use crate::Result;
59+
5160
pub struct BrokerOuterAPI {
5261
remoting_client: ArcRefCellWrapper<RocketmqDefaultClient<DefaultRemotingRequestProcessor>>,
5362
name_server_address: Option<String>,
@@ -317,6 +326,39 @@ impl BrokerOuterAPI {
317326
pub fn rpc_client(&self) -> &RpcClientImpl {
318327
&self.rpc_client
319328
}
329+
330+
pub async fn lock_batch_mq_async(
331+
&self,
332+
addr: String,
333+
request_body: bytes::Bytes,
334+
timeout_millis: u64,
335+
) -> Result<HashSet<MessageQueue>> {
336+
let mut request = RemotingCommand::create_request_command(
337+
RequestCode::LockBatchMq,
338+
LockBatchMqRequestHeader::default(),
339+
);
340+
request.set_body_mut_ref(Some(request_body));
341+
let result = self
342+
.remoting_client
343+
.invoke_async(Some(addr), request, timeout_millis)
344+
.await;
345+
match result {
346+
Ok(response) => {
347+
if ResponseCode::from(response.code()) == ResponseCode::Success {
348+
let lock_batch_response_body =
349+
LockBatchResponseBody::decode(response.get_body().unwrap()).unwrap();
350+
Ok(lock_batch_response_body.lock_ok_mq_set)
351+
} else {
352+
Err(BrokerError::MQBrokerError(
353+
response.code(),
354+
response.remark().cloned().unwrap_or("".to_string()),
355+
"".to_string(),
356+
))
357+
}
358+
}
359+
Err(e) => Err(BrokerClientError(e)),
360+
}
361+
}
320362
}
321363

322364
fn dns_lookup_address_by_domain(domain: &str) -> Vec<String> {

0 commit comments

Comments
 (0)