Skip to content

Commit 0f6030f

Browse files
authored
[ISSUE #1125]🔊Implementing functionality similar to Java's LinkedBlockingQueue in Rust using Tokio🚀 (#1126)
1 parent fac3627 commit 0f6030f

File tree

2 files changed

+75
-0
lines changed

2 files changed

+75
-0
lines changed
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
use std::collections::VecDeque;
19+
20+
use tokio::sync::Mutex;
21+
use tokio::sync::Notify;
22+
use tokio::time;
23+
24+
pub struct BlockingQueue<T> {
25+
queue: Mutex<VecDeque<T>>,
26+
capacity: usize,
27+
notify: Notify,
28+
}
29+
30+
impl<T> BlockingQueue<T> {
31+
pub fn new(capacity: usize) -> Self {
32+
BlockingQueue {
33+
queue: Mutex::new(VecDeque::with_capacity(capacity)),
34+
capacity,
35+
notify: Notify::new(),
36+
}
37+
}
38+
39+
pub async fn put(&self, item: T) {
40+
loop {
41+
{
42+
let mut queue = self.queue.lock().await;
43+
if queue.len() < self.capacity {
44+
queue.push_back(item);
45+
self.notify.notify_one(); // Notify only after successful push
46+
return;
47+
}
48+
}
49+
self.notify.notified().await;
50+
}
51+
}
52+
53+
pub async fn offer(&self, item: T, timeout: std::time::Duration) -> bool {
54+
time::timeout(timeout, self.put(item)).await.is_ok()
55+
}
56+
57+
pub async fn take(&self) -> T {
58+
loop {
59+
{
60+
let mut queue = self.queue.lock().await;
61+
if let Some(item) = queue.pop_front() {
62+
self.notify.notify_one(); // Notify only after successful pop
63+
return item;
64+
}
65+
}
66+
self.notify.notified().await;
67+
}
68+
}
69+
70+
pub async fn poll(&self, timeout: std::time::Duration) -> Option<T> {
71+
time::timeout(timeout, self.take()).await.ok()
72+
}
73+
}

‎rocketmq/src/lib.rs‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717
#![feature(sync_unsafe_cell)]
1818

1919
mod arc_mut;
20+
mod blocking_queue;
2021
pub mod count_down_latch;
2122
pub mod rocketmq_tokio_lock;
2223
mod shutdown;
2324

2425
pub use arc_mut::ArcMut;
2526
pub use arc_mut::SyncUnsafeCellWrapper;
2627
pub use arc_mut::WeakArcMut;
28+
pub use blocking_queue::BlockingQueue as RocketMQBlockingQueue;
2729
pub use count_down_latch::CountDownLatch;
2830
/// Re-export rocketmq main.
2931
pub use rocketmq::main;

0 commit comments

Comments
 (0)