Skip to content

Commit ff14e13

Browse files
authored
Next minor update (#37)
* use uuid as listener_id instead of a string * fmt cargo.toml & ignore bacon files * relax trait bounds on emit method * add extra utils methods and map listener map private * update deps * add global_listener for all events * fix format check in ci * update documentation * add a test for global listener
1 parent e487c7e commit ff14e13

File tree

6 files changed

+203
-88
lines changed

6 files changed

+203
-88
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ jobs:
2020
with:
2121
toolchain: stable
2222

23-
- run: cargo fmt --all -- --check
23+
- run: cargo +stable fmt --all -- --check
2424
test:
2525
name: lib tests and docs test
2626
runs-on: ubuntu-latest

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,6 @@
22
/Cargo.lock
33
*.idea/
44
.vscode
5-
*.profraw
5+
*.profraw
6+
## ignore bacon files
7+
.bacon*

Cargo.toml

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,35 +3,35 @@ name = "async-event-emitter"
33
version = "0.1.4"
44
edition = "2021"
55
description = "Lightweight AsyncEventEmitter"
6-
authors = [ "Spencer Najib", "Dylan Kerler" ]
7-
keywords = [ "event-emitter", "tokio", "async-rust", "futures", "bincode" ]
6+
authors = ["Spencer Najib", "Dylan Kerler"]
7+
keywords = ["event-emitter", "tokio", "async-rust", "futures", "bincode"]
88
license = "MIT"
99
repository = "https://github.com/spencerjibz/async-event-emitter-rs"
1010
homepage = "https://github.com/spencerjibz/async-event-emitter-rs"
11-
categories = [ "asynchronous", "web-programming" ]
11+
categories = ["asynchronous", "web-programming"]
1212
readme = "./README.md"
1313

1414
[dependencies]
15-
anyhow = "1.0.95"
15+
anyhow = "1.0.99"
1616
bincode = "1.3.3"
1717
futures = "0.3.31"
1818
lazy_static = "1.5.0"
1919

20-
[dependencies.dashmap]
21-
version = "6.1.0"
22-
default-features = false
20+
[dependencies.dashmap]
21+
version = "6.1.0"
22+
default-features = false
2323

24-
[dependencies.serde]
25-
version = "1.0.217"
26-
features = [ "derive" ]
27-
default-features = false
24+
[dependencies.serde]
25+
version = "1.0.219"
26+
features = ["derive"]
27+
default-features = false
2828

29-
[dependencies.uuid]
30-
version = "1.13.1"
31-
features = [ "v4" ]
32-
default-features = false
29+
[dependencies.uuid]
30+
version = "1.18.0"
31+
features = ["v4"]
32+
default-features = false
3333

3434
[dev-dependencies.tokio]
35-
version = "1.43.0"
36-
features = [ "rt", "macros", "rt-multi-thread" ]
35+
version = "1.47.1"
36+
features = ["rt", "macros", "rt-multi-thread"]
3737
default-features = false

README.md

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ Events are in the form of (strings, value) and callbacks are in the form of clos
1414
#### Differences between this crate and [`event-emitter-rs`](https://crates.io/crates/event-emitter-rs)
1515
- This is an async implementation that works for all common async runtimes (Tokio, async-std and smol)
1616
- The listener methods ***(on and once)*** take a callback that returns a future instead of a merely a closure.
17-
- The emit methods executes each callback on each event by spawning a tokio task instead of a std::thread
17+
- The emit methods executes each callback on each event by spawning intra-task instead of a std::thread.
1818
- This emitter is thread safe and can also be used lock-free (supports interior mutability).
1919

2020
***Note***: To use strict return and event types, use [typed-emitter](https://crates.io/crates/typed-emitter), that crate solves [this issue](https://github.com/spencerjibz/async-event-emitter-rs/issues/31) too.
@@ -103,7 +103,21 @@ Removing listeners is also easy
103103
None => println!("No event listener of that id exists"),
104104
}
105105
```
106+
Listening to all emitted events with a single listener
107+
```rust
108+
use async_event_emitter::AsyncEventEmitter as EventEmitter;
109+
#[tokio::main]
110+
async fn main() {
111+
let mut event_emitter = EventEmitter::new();
112+
// this will print Hello world two because of
113+
event_emitter.on_all(|value: i32| async move { println!("Hello world! - {value}") });
114+
// >> "Hello world! - 1"
115+
// >> "Hello world! - 2"
116+
event_emitter.emit("Some event", 1).await;
117+
event_emitter.emit("next event", 2).await;
118+
}
106119

120+
```
107121
#### Creating a Global EventEmitter
108122

109123
You'll likely want to have a single EventEmitter instance that can be shared across files;

src/lib.rs

Lines changed: 145 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,9 @@
88
## Differences between this crate and [`event-emitter-rs`](https://crates.io/crates/event-emitter-rs)
99
- This is an async implementation that works for all common async runtimes (Tokio, async-std and smol)
1010
- The listener methods ***(on and once)*** take a callback that returns a future instead of a merely a closure.
11-
- The emit methods executes each callback on each event by spawning a tokio task instead of a std::thread
11+
- The emit methods executes each callback on each event by running async intra-task instead of spawning a std::thread
1212
- This emitter is thread safe and can also be used lock-free (supports interior mutability).
1313
14-
1514
***Note***: To use strict return and event types, use [typed-emitter](https://crates.io/crates/typed-emitter), that crate solves [this issue](https://github.com/spencerjibz/async-event-emitter-rs/issues/31) too.
1615
1716
## Getting Started
@@ -70,11 +69,28 @@
7069
let event_emitter = EventEmitter::new();
7170
7271
let listener_id = event_emitter.on("Hello", |_: ()| async {println!("Hello World")});
73-
match event_emitter.remove_listener(&listener_id) {
72+
match event_emitter.remove_listener(listener_id) {
7473
Some(listener_id) => print!("Removed event listener!"),
7574
None => print!("No event listener of that id exists")
7675
}
7776
```
77+
78+
Listening to all emitted events with a single listener
79+
80+
```rust
81+
use async_event_emitter::AsyncEventEmitter as EventEmitter;
82+
#[tokio::main]
83+
async fn main() {
84+
let mut event_emitter = EventEmitter::new();
85+
// this will print Hello world two because of
86+
event_emitter.on_all(|value: i32| async move { println!("Hello world! - {value}") });
87+
// >> "Hello world! - 1"
88+
// >> "Hello world! - 2"
89+
event_emitter.emit("Some event", 1).await;
90+
event_emitter.emit("next event", 2).await;
91+
}
92+
93+
```
7894
## Creating a Global EventEmitter
7995
8096
It's likely that you'll want to have a single EventEmitter instance that can be shared across files;
@@ -121,23 +137,75 @@ use futures::StreamExt;
121137
use serde::{Deserialize, Serialize};
122138
use uuid::Uuid;
123139
pub type AsyncCB = dyn Fn(Vec<u8>) -> BoxFuture<'static, ()> + Send + Sync + 'static;
124-
use std::sync::Arc;
140+
use std::sync::{Arc, Mutex};
125141
#[derive(Clone)]
126142
pub struct AsyncListener {
127143
pub callback: Arc<AsyncCB>,
128144
pub limit: Option<u64>,
129-
pub id: String,
145+
pub id: Uuid,
130146
}
131147

132148
#[derive(Default, Clone)]
133149
pub struct AsyncEventEmitter {
134-
pub listeners: DashMap<String, Vec<AsyncListener>>,
150+
listeners: DashMap<String, Vec<AsyncListener>>,
151+
all_listener: Arc<Mutex<Option<AsyncListener>>>,
135152
}
136153

137154
impl AsyncEventEmitter {
138155
pub fn new() -> Self {
139156
Self::default()
140157
}
158+
/// Returns the numbers of events
159+
/// # Example
160+
///
161+
/// ```rust
162+
/// use async_event_emitter::AsyncEventEmitter;
163+
/// let event_emitter = AsyncEventEmitter::new();
164+
/// event_emitter.event_count(); // returns 0
165+
/// ```
166+
pub fn event_count(&self) -> usize {
167+
self.listeners.len()
168+
}
169+
170+
/// Returns all listeners on the specified event
171+
/// # Example
172+
/// ```rust
173+
/// use async_event_emitter::AsyncEventEmitter;
174+
/// #[tokio::main]
175+
/// async fn main() {
176+
/// let emitter = AsyncEventEmitter::new();
177+
/// emitter.on("test", |value: ()| async { println!("Hello world!") });
178+
/// emitter.emit("test", ()).await;
179+
/// let listeners = emitter.listeners_by_event("test");
180+
/// println!("{listeners:?}");
181+
/// }
182+
/// ```
183+
pub fn listeners_by_event(&self, event: &str) -> Vec<AsyncListener> {
184+
if let Some(listeners) = self.listeners.get(event) {
185+
let values = listeners.to_vec();
186+
return values;
187+
}
188+
vec![]
189+
}
190+
/// Returns the numbers of listners per event
191+
/// # Example
192+
///
193+
/// ```rust
194+
/// use async_event_emitter::AsyncEventEmitter;
195+
/// #[tokio::main]
196+
/// async fn main() {
197+
/// let emitter = AsyncEventEmitter::new();
198+
/// emitter.on("test", |value: ()| async { println!("Hello world!") });
199+
/// emitter.emit("test", ()).await;
200+
/// emitter.listener_count_by_event("test"); // returns 1
201+
/// }
202+
/// ```
203+
pub fn listener_count_by_event(&self, event: &str) -> usize {
204+
if let Some(listeners) = self.listeners.get(event) {
205+
return listeners.len();
206+
}
207+
0
208+
}
141209

142210
/// Emits an event of the given parameters and executes each callback that is listening to that event asynchronously by spawning a task for each callback.
143211
///
@@ -156,9 +224,9 @@ impl AsyncEventEmitter {
156224
/// Ok(())
157225
/// }
158226
/// ```
159-
pub async fn emit<'a, T>(&self, event: &str, value: T) -> anyhow::Result<()>
227+
pub async fn emit<T>(&self, event: &str, value: T) -> anyhow::Result<()>
160228
where
161-
T: Serialize + Deserialize<'a> + Send + Sync + 'a,
229+
T: Serialize,
162230
{
163231
let mut futures: FuturesUnordered<_> = FuturesUnordered::new();
164232

@@ -191,6 +259,12 @@ impl AsyncEventEmitter {
191259
}
192260
}
193261

262+
if let Some(global_listener) = self.all_listener.lock().unwrap().as_ref() {
263+
let callback = global_listener.callback.clone();
264+
let bytes: Vec<u8> = bincode::serialize(&value)?;
265+
futures.push(callback(bytes));
266+
}
267+
194268
while futures.next().await.is_some() {}
195269
Ok(())
196270
}
@@ -204,23 +278,26 @@ impl AsyncEventEmitter {
204278
/// let event_emitter = AsyncEventEmitter::new();
205279
/// let listener_id =
206280
/// event_emitter.on("Some event", |value: ()| async { println!("Hello world!") });
207-
/// println!("{:?}", event_emitter.listeners);
208-
///
209281
/// // Removes the listener that we just added
210-
/// event_emitter.remove_listener(&listener_id);
282+
/// event_emitter.remove_listener(listener_id);
211283
/// ```
212-
pub fn remove_listener(&self, id_to_delete: &str) -> Option<String> {
284+
pub fn remove_listener(&self, id_to_delete: Uuid) -> Option<Uuid> {
213285
for mut mut_ref in self.listeners.iter_mut() {
214286
let event_listeners = mut_ref.value_mut();
215287
if let Some(index) = event_listeners
216288
.iter()
217289
.position(|listener| listener.id == id_to_delete)
218290
{
219291
event_listeners.remove(index);
220-
return Some(id_to_delete.to_string());
292+
return Some(id_to_delete);
293+
}
294+
}
295+
let mut all_listener = self.all_listener.lock().unwrap();
296+
if let Some(listener) = all_listener.as_ref() {
297+
if id_to_delete == listener.id {
298+
all_listener.take();
221299
}
222300
}
223-
224301
None
225302
}
226303

@@ -242,13 +319,13 @@ impl AsyncEventEmitter {
242319
/// event_emitter.emit("Some event", ()).await; // 4 >> <Nothing happens here because listener was deleted after the 3rd call>
243320
/// }
244321
/// ```
245-
pub fn on_limited<F, T, C>(&self, event: &str, limit: Option<u64>, callback: C) -> String
322+
pub fn on_limited<F, T, C>(&self, event: &str, limit: Option<u64>, callback: C) -> Uuid
246323
where
247324
for<'de> T: Deserialize<'de>,
248325
C: Fn(T) -> F + Send + Sync + 'static,
249326
F: Future<Output = ()> + Send + Sync + 'static,
250327
{
251-
let id = Uuid::new_v4().to_string();
328+
let id = Uuid::new_v4();
252329
let parsed_callback = move |bytes: Vec<u8>| {
253330
let value: T = bincode::deserialize(&bytes).unwrap_or_else(|_| {
254331
panic!(
@@ -261,7 +338,7 @@ impl AsyncEventEmitter {
261338
};
262339

263340
let listener = AsyncListener {
264-
id: id.clone(),
341+
id,
265342
limit,
266343
callback: Arc::new(parsed_callback),
267344
};
@@ -294,7 +371,7 @@ impl AsyncEventEmitter {
294371
/// event_emitter.emit("Some event", ());
295372
/// // >> <Nothing happens here since listener was deleted>
296373
/// ```
297-
pub fn once<F, T, C>(&self, event: &str, callback: C) -> String
374+
pub fn once<F, T, C>(&self, event: &str, callback: C) -> Uuid
298375
where
299376
for<'de> T: Deserialize<'de>,
300377
C: Fn(T) -> F + Send + Sync + 'static,
@@ -317,14 +394,63 @@ impl AsyncEventEmitter {
317394
/// // MUST also match the type that is being emitted (here we just use a throwaway `()` type since we don't care about using the `value`)
318395
/// event_emitter.on("Some event", |value: ()| async { println!("Hello world!")});
319396
/// ```
320-
pub fn on<F, T, C>(&self, event: &str, callback: C) -> String
397+
pub fn on<F, T, C>(&self, event: &str, callback: C) -> Uuid
321398
where
322399
for<'de> T: Deserialize<'de>,
323400
C: Fn(T) -> F + Send + Sync + 'static,
324401
F: Future<Output = ()> + Send + Sync + 'static,
325402
{
326403
self.on_limited(event, None, callback)
327404
}
405+
/// Adds an event listener called for whenever every event is called
406+
/// Returns the id of the newly added listener.
407+
///
408+
/// # Example
409+
/// ```rust
410+
/// use async_event_emitter::AsyncEventEmitter;
411+
/// #[tokio::main]
412+
/// async fn main() {
413+
/// let mut event_emitter = AsyncEventEmitter::new();
414+
/// // this will print Hello world two because of
415+
/// event_emitter.on_all(|value: ()| async { println!("Hello world!") });
416+
/// event_emitter.emit("Some event", ()).await;
417+
/// // >> "Hello world!"
418+
///
419+
/// event_emitter.emit("next event", ()).await;
420+
/// // >> <Nothing happens here since listener was deleted>
421+
/// }
422+
/// ```
423+
pub fn on_all<F, T, C>(&self, callback: C) -> Uuid
424+
where
425+
for<'de> T: Deserialize<'de>,
426+
C: Fn(T) -> F + Send + Sync + 'static,
427+
F: Future<Output = ()> + Send + Sync + 'static,
428+
{
429+
assert!(
430+
self.all_listener.lock().unwrap().is_none(),
431+
"only one global listener is allowed"
432+
);
433+
let id = Uuid::new_v4();
434+
let parsed_callback = move |bytes: Vec<u8>| {
435+
let value: T = bincode::deserialize(&bytes).unwrap_or_else(|_| {
436+
panic!(
437+
" value can't be deserialized into type {}",
438+
std::any::type_name::<T>()
439+
)
440+
});
441+
callback(value).boxed()
442+
};
443+
444+
let listener = AsyncListener {
445+
id,
446+
limit: None,
447+
callback: Arc::new(parsed_callback),
448+
};
449+
450+
self.all_listener.lock().unwrap().replace(listener);
451+
452+
id
453+
}
328454
}
329455

330456
// test the AsyncEventEmitter

0 commit comments

Comments
 (0)