Skip to content

Commit 68f2458

Browse files
committed
refactor: refactor weak session implementation
The previous implementation of `WeakSession` relied on a weak counter wrapped in a mutex. It was unnecessarily complex, as an atomic strong counter does the job better.
1 parent b3ccf82 commit 68f2458

File tree

1 file changed

+8
-31
lines changed

1 file changed

+8
-31
lines changed

zenoh/src/api/session.rs

Lines changed: 8 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@ use std::{
1717
collections::HashMap,
1818
convert::TryInto,
1919
fmt,
20+
mem::ManuallyDrop,
2021
ops::Deref,
2122
sync::{
22-
atomic::{AtomicU16, Ordering},
23-
Arc, Mutex, RwLock,
23+
atomic::{AtomicU16, AtomicUsize, Ordering},
24+
Arc, RwLock,
2425
},
2526
time::{Duration, SystemTime, UNIX_EPOCH},
2627
};
@@ -527,7 +528,7 @@ impl<T, S> Undeclarable<S> for T where T: UndeclarableSealed<S> {}
527528

528529
pub(crate) struct SessionInner {
529530
/// See [`WeakSession`] doc
530-
weak_counter: Mutex<usize>,
531+
strong_counter: AtomicUsize,
531532
pub(crate) runtime: Runtime,
532533
pub(crate) state: RwLock<SessionState>,
533534
pub(crate) id: u16,
@@ -573,16 +574,14 @@ impl fmt::Debug for Session {
573574

574575
impl Clone for Session {
575576
fn clone(&self) -> Self {
576-
let _weak = self.0.weak_counter.lock().unwrap();
577+
self.0.strong_counter.fetch_add(1, Ordering::Relaxed);
577578
Self(self.0.clone())
578579
}
579580
}
580581

581582
impl Drop for Session {
582583
fn drop(&mut self) {
583-
let weak = self.0.weak_counter.lock().unwrap();
584-
if Arc::strong_count(&self.0) == *weak + /* the `Arc` currently dropped */ 1 {
585-
drop(weak);
584+
if self.0.strong_counter.fetch_sub(1, Ordering::Relaxed) == 1 {
586585
if let Err(error) = self.close().wait() {
587586
tracing::error!(error)
588587
}
@@ -596,32 +595,17 @@ impl Drop for Session {
596595
/// When all `Session` instance are dropped, [`Session::close`] is be called and cleans
597596
/// the reference cycles, allowing the underlying `Arc` to be properly reclaimed.
598597
///
599-
/// The pseudo-weak algorithm relies on a counter wrapped in a mutex. It was indeed the simplest
600-
/// to implement it, because atomic manipulations to achieve this semantic would not have been
601-
/// trivial at all — what could happen if a pseudo-weak is cloned while the last session instance
602-
/// is dropped? With a mutex, it's simple, and it works perfectly fine, as we don't care about the
603-
/// performance penalty when it comes to session entities cloning/dropping.
604-
///
605598
/// (Although it was planed to be used initially, `Weak` was in fact causing errors in the session
606599
/// closing, because the primitive implementation seemed to be used in the closing operation.)
600+
#[derive(Clone)]
607601
pub(crate) struct WeakSession(Arc<SessionInner>);
608602

609603
impl WeakSession {
610604
fn new(session: &Arc<SessionInner>) -> Self {
611-
let mut weak = session.weak_counter.lock().unwrap();
612-
*weak += 1;
613605
Self(session.clone())
614606
}
615607
}
616608

617-
impl Clone for WeakSession {
618-
fn clone(&self) -> Self {
619-
let mut weak = self.0.weak_counter.lock().unwrap();
620-
*weak += 1;
621-
Self(self.0.clone())
622-
}
623-
}
624-
625609
impl fmt::Debug for WeakSession {
626610
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
627611
self.0.fmt(f)
@@ -636,13 +620,6 @@ impl Deref for WeakSession {
636620
}
637621
}
638622

639-
impl Drop for WeakSession {
640-
fn drop(&mut self) {
641-
let mut weak = self.0.weak_counter.lock().unwrap();
642-
*weak -= 1;
643-
}
644-
}
645-
646623
/// Error indicating the operation cannot proceed because the session is closed.
647624
///
648625
/// It may be returned by operations like [`Session::get`] or [`Publisher::put`](crate::api::publisher::Publisher::put) when
@@ -677,7 +654,7 @@ impl Session {
677654
publisher_qos.into(),
678655
));
679656
let session = Session(Arc::new(SessionInner {
680-
weak_counter: Mutex::new(0),
657+
strong_counter: AtomicUsize::new(1),
681658
runtime: runtime.clone(),
682659
state,
683660
id: SESSION_ID_COUNTER.fetch_add(1, Ordering::SeqCst),

0 commit comments

Comments
 (0)