Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 25 additions & 18 deletions rocketmq/src/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ where
T: Clone,
{
/// Create a new `Shutdown` backed by the given `broadcast::Receiver`.
pub fn new(notify: broadcast::Receiver<T>) -> Shutdown<T> {
Shutdown {
pub fn new(capacity: usize) -> (Shutdown<T>, broadcast::Sender<T>) {
let (tx, _) = broadcast::channel(capacity);
let shutdown = Shutdown {
is_shutdown: false,
notify,
}
notify: tx.subscribe(),
};
(shutdown, tx)
}

/// Returns `true` if the shutdown signal has been received.
Expand Down Expand Up @@ -63,33 +65,38 @@ where

#[cfg(test)]
mod tests {
use tokio::sync::broadcast;

use super::*;

#[tokio::test]
async fn shutdown_initial_state() {
let (_, rx) = broadcast::channel::<()>(1);
let shutdown = Shutdown::new(rx);
async fn shutdown_signal_received() {
let (mut shutdown, sender) = Shutdown::new(1);
sender.send(()).unwrap();
shutdown.recv().await;
assert!(shutdown.is_shutdown());
}

#[tokio::test]
async fn shutdown_signal_not_received() {
let (shutdown, _) = Shutdown::<()>::new(1);
assert!(!shutdown.is_shutdown());
}

#[tokio::test]
async fn shutdown_signal_received() {
let (tx, rx) = broadcast::channel::<()>(1);
let mut shutdown = Shutdown::new(rx);
tx.send(()).unwrap();
shutdown.recv().await;
assert!(shutdown.is_shutdown());
async fn shutdown_signal_multiple_receivers() {
let (mut shutdown1, sender) = Shutdown::new(1);
sender.send(()).unwrap();
shutdown1.recv().await;

assert!(shutdown1.is_shutdown());
Comment on lines +86 to +91
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Test shutdown_signal_multiple_receivers does not test multiple receivers

The test shutdown_signal_multiple_receivers creates only one Shutdown instance (shutdown1) and verifies that it receives the shutdown signal. To properly test multiple receivers, consider creating additional Shutdown instances by subscribing to the same sender, and ensure that each one receives the shutdown signal.

Apply this diff to enhance the test:

 #[tokio::test]
 async fn shutdown_signal_multiple_receivers() {
     let (mut shutdown1, sender) = Shutdown::new(1);
+    let mut shutdown2 = Shutdown {
+        is_shutdown: false,
+        notify: sender.subscribe(),
+    };
     sender.send(()).unwrap();
     shutdown1.recv().await;
+    shutdown2.recv().await;
 
     assert!(shutdown1.is_shutdown());
+    assert!(shutdown2.is_shutdown());
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async fn shutdown_signal_multiple_receivers() {
let (mut shutdown1, sender) = Shutdown::new(1);
sender.send(()).unwrap();
shutdown1.recv().await;
assert!(shutdown1.is_shutdown());
async fn shutdown_signal_multiple_receivers() {
let (mut shutdown1, sender) = Shutdown::new(1);
let mut shutdown2 = Shutdown {
is_shutdown: false,
notify: sender.subscribe(),
};
sender.send(()).unwrap();
shutdown1.recv().await;
shutdown2.recv().await;
assert!(shutdown1.is_shutdown());
assert!(shutdown2.is_shutdown());

}

#[tokio::test]
async fn shutdown_signal_already_received() {
let (tx, rx) = broadcast::channel::<()>(1);
let mut shutdown = Shutdown::new(rx);
tx.send(()).unwrap();
shutdown.recv().await;
let (mut shutdown, sender) = Shutdown::new(1);
sender.send(()).unwrap();
shutdown.recv().await;
shutdown.recv().await; // Call recv again
assert!(shutdown.is_shutdown());
}
}
Loading