Skip to content

Commit 57ce1b2

Browse files
committed
fix ci
1 parent 271c335 commit 57ce1b2

File tree

2 files changed

+2
-4
lines changed

2 files changed

+2
-4
lines changed

rocketmq-broker/src/transaction/queue/default_transactional_message_service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ where
131131
let mut send_map = HashMap::<i32, Message>::new();
132132
let delete_context_mutex_guard = self.delete_context.lock().await;
133133
for (queue_id, mq_context) in delete_context_mutex_guard.iter() {
134-
if mq_context.get_total_size().await <= 0
134+
if mq_context.get_total_size().await == 0
135135
|| mq_context.is_empty().await
136136
|| (mq_context.get_total_size().await < max_size as u32
137137
&& (start_time as i64 - mq_context.get_last_write_timestamp().await as i64)

rocketmq-broker/src/transaction/queue/message_queue_op_context.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,7 @@ impl MessageQueueOpContext {
4949
if self.context_receiver.len() > self.queue_capacity {
5050
return Err(anyhow::Error::msg("queue is full".to_string()));
5151
}
52-
self.context_queue
53-
.send(msg)
54-
.map_err(|e| anyhow::Error::new(e))
52+
self.context_queue.send(msg).map_err(anyhow::Error::new)
5553
}
5654
pub async fn offer(&self, item: String, timeout: std::time::Duration) -> Result<()> {
5755
time::timeout(timeout, self.push(item)).await.unwrap()

0 commit comments

Comments
 (0)