Skip to content

Commit 271c335

Browse files
committed
[Refactor♻️]Refactor MessageQueueOpContext
#3386
1 parent 2e81447 commit 271c335

File tree

2 files changed

+56
-45
lines changed

2 files changed

+56
-45
lines changed

rocketmq-broker/src/transaction/queue/default_transactional_message_service.rs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,9 @@ where
131131
let mut send_map = HashMap::<i32, Message>::new();
132132
let delete_context_mutex_guard = self.delete_context.lock().await;
133133
for (queue_id, mq_context) in delete_context_mutex_guard.iter() {
134-
if mq_context.get_total_size() <= 0
135-
|| mq_context.context_queue().is_empty().await
136-
|| (mq_context.get_total_size() < max_size as i32
134+
if mq_context.get_total_size().await <= 0
135+
|| mq_context.is_empty().await
136+
|| (mq_context.get_total_size().await < max_size as u32
137137
&& (start_time as i64 - mq_context.get_last_write_timestamp().await as i64)
138138
< interval as i64)
139139
{
@@ -145,7 +145,7 @@ where
145145
}
146146
send_map.insert(*queue_id, op_message.unwrap());
147147
first_timestamp = first_timestamp.min(mq_context.get_last_write_timestamp().await);
148-
if mq_context.get_total_size() >= max_size as i32 {
148+
if mq_context.get_total_size().await >= max_size as u32 {
149149
over_size = true;
150150
}
151151
}
@@ -172,8 +172,8 @@ where
172172
more_data: Option<String>,
173173
) -> Option<Message> {
174174
let topic = TransactionalMessageUtil::build_op_topic();
175-
let delete_context = self.delete_context.lock().await;
176-
let mq_context = delete_context.get(&queue_id)?;
175+
let mut delete_context = self.delete_context.lock().await;
176+
let mq_context = delete_context.get_mut(&queue_id)?;
177177

178178
let more_data_length = if let Some(ref data) = more_data {
179179
data.len()
@@ -187,7 +187,7 @@ where
187187
.broker_config()
188188
.transaction_op_msg_max_size as usize;
189189
if length < max_size {
190-
let sz = mq_context.get_total_size() as usize;
190+
let sz = mq_context.get_total_size().await as usize;
191191
if sz > max_size || length + sz > max_size {
192192
length = max_size + 100;
193193
} else {
@@ -201,12 +201,12 @@ where
201201
sb.push_str(&data);
202202
}
203203

204-
while !mq_context.context_queue().is_empty().await {
204+
while !mq_context.is_empty().await {
205205
if sb.len() >= max_size {
206206
break;
207207
}
208208
{
209-
if let Some(data) = mq_context.context_queue().try_poll().await {
209+
if let Ok(data) = mq_context.pull().await {
210210
sb.push_str(&data);
211211
}
212212
}
@@ -1069,17 +1069,16 @@ where
10691069
);
10701070
let len = data.len();
10711071
let res = mq_context
1072-
.context_queue()
10731072
.offer(data.clone(), Duration::from_millis(100))
10741073
.await;
1075-
if res {
1076-
let total_size = mq_context.total_size_add_and_get(len as i32);
1074+
if res.is_ok() {
1075+
let total_size = mq_context.total_size_add_and_get(len as u32).await;
10771076
if total_size
10781077
> self
10791078
.transactional_message_bridge
10801079
.broker_runtime_inner
10811080
.broker_config()
1082-
.transaction_op_msg_max_size
1081+
.transaction_op_msg_max_size as u32
10831082
{
10841083
if let Some(batch_service) = &self.transactional_op_batch_service {
10851084
batch_service.wakeup();
Lines changed: 44 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,71 @@
1-
/*
2-
* Licensed to the Apache Software Foundation (ASF) under one or more
3-
* contributor license agreements. See the NOTICE file distributed with
4-
* this work for additional information regarding copyright ownership.
5-
* The ASF licenses this file to You under the Apache License, Version 2.0
6-
* (the "License"); you may not use this file except in compliance with
7-
* the License. You may obtain a copy of the License at
8-
*
9-
* http://www.apache.org/licenses/LICENSE-2.0
10-
*
11-
* Unless required by applicable law or agreed to in writing, software
12-
* distributed under the License is distributed on an "AS IS" BASIS,
13-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14-
* See the License for the specific language governing permissions and
15-
* limitations under the License.
16-
*/
17-
use std::sync::atomic::AtomicI32;
1+
use std::sync::atomic::AtomicU32;
2+
use std::sync::atomic::AtomicU64;
183
use std::sync::atomic::Ordering;
194
use std::sync::Arc;
205

21-
use rocketmq_rust::RocketMQBlockingQueue;
22-
use tokio::sync::Mutex;
6+
use anyhow::Error;
7+
use rocketmq_error::Result;
8+
use tokio::sync::mpsc;
9+
use tokio::time;
2310

2411
pub struct MessageQueueOpContext {
25-
total_size: AtomicI32,
26-
last_write_timestamp: Arc<Mutex<u64>>,
27-
context_queue: RocketMQBlockingQueue<String>,
12+
total_size: AtomicU32,
13+
last_write_timestamp: AtomicU64,
14+
context_queue: Arc<mpsc::UnboundedSender<String>>,
15+
context_receiver: mpsc::UnboundedReceiver<String>,
16+
queue_capacity: usize,
2817
}
2918

3019
impl MessageQueueOpContext {
3120
pub fn new(timestamp: u64, queue_length: usize) -> Self {
21+
let unbounded_channel = mpsc::unbounded_channel::<String>();
3222
MessageQueueOpContext {
33-
total_size: AtomicI32::new(0),
34-
last_write_timestamp: Arc::new(Mutex::new(timestamp)),
35-
context_queue: RocketMQBlockingQueue::new(queue_length),
23+
total_size: AtomicU32::new(0),
24+
last_write_timestamp: AtomicU64::new(timestamp),
25+
context_queue: Arc::new(unbounded_channel.0),
26+
context_receiver: unbounded_channel.1,
27+
queue_capacity: queue_length,
3628
}
3729
}
3830

39-
pub fn get_total_size(&self) -> i32 {
31+
pub async fn get_total_size(&self) -> u32 {
4032
self.total_size.load(Ordering::Relaxed)
4133
}
4234

43-
pub fn total_size_add_and_get(&self, delta: i32) -> i32 {
35+
pub async fn total_size_add_and_get(&self, delta: u32) -> u32 {
4436
self.total_size.fetch_add(delta, Ordering::AcqRel) + delta
4537
}
4638

4739
pub async fn get_last_write_timestamp(&self) -> u64 {
48-
*self.last_write_timestamp.lock().await
40+
self.last_write_timestamp.load(Ordering::Relaxed)
4941
}
5042

5143
pub async fn set_last_write_timestamp(&self, timestamp: u64) {
52-
let mut last_timestamp = self.last_write_timestamp.lock().await;
53-
*last_timestamp = timestamp;
44+
self.last_write_timestamp
45+
.store(timestamp, Ordering::Release);
5446
}
5547

56-
pub fn context_queue(&self) -> &RocketMQBlockingQueue<String> {
57-
&self.context_queue
48+
pub async fn push(&self, msg: String) -> Result<()> {
49+
if self.context_receiver.len() > self.queue_capacity {
50+
return Err(anyhow::Error::msg("queue is full".to_string()));
51+
}
52+
self.context_queue
53+
.send(msg)
54+
.map_err(|e| anyhow::Error::new(e))
55+
}
56+
pub async fn offer(&self, item: String, timeout: std::time::Duration) -> Result<()> {
57+
time::timeout(timeout, self.push(item)).await.unwrap()
58+
}
59+
pub async fn pull(&mut self) -> Result<String> {
60+
if let Some(item) = self.context_receiver.recv().await {
61+
return Ok(item);
62+
}
63+
Err(Error::msg("pull failed, queue is empty".to_string()))
64+
}
65+
pub async fn is_empty(&self) -> bool {
66+
if self.context_receiver.len() == 0 {
67+
return true;
68+
}
69+
false
5870
}
5971
}

0 commit comments

Comments
 (0)