Skip to content

TDD: Track session state post-close access in tests #1905

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 70 additions & 19 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
#[cfg(feature = "test")]
use std::ops::DerefMut;
use std::{
collections::{hash_map::Entry, HashMap},
convert::TryInto,
Expand Down Expand Up @@ -525,10 +527,49 @@ pub trait Undeclarable<S = ()>: UndeclarableSealed<S> {}

impl<T, S> Undeclarable<S> for T where T: UndeclarableSealed<S> {}

#[cfg(feature = "test")]
pub(crate) struct InvalidatableSessionState {
inner: Option<SessionState>,
}

#[cfg(feature = "test")]
impl InvalidatableSessionState {
fn new(state: SessionState) -> Self {
Self { inner: Some(state) }
}

fn invalidate(&mut self) {
let _ = self.inner.take();
}
}

#[cfg(feature = "test")]
impl DerefMut for InvalidatableSessionState {
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner
.as_mut()
.expect("referring to invalidated (closed) session state")
}
}

#[cfg(feature = "test")]
impl Deref for InvalidatableSessionState {
type Target = SessionState;

fn deref(&self) -> &Self::Target {
self.inner
.as_ref()
.expect("referring to invalidated (closed) session state")
}
}

pub(crate) struct SessionInner {
/// See [`WeakSession`] doc
weak_counter: Mutex<usize>,
pub(crate) runtime: Runtime,
#[cfg(feature = "test")]
pub(crate) state: RwLock<InvalidatableSessionState>,
#[cfg(not(feature = "test"))]
pub(crate) state: RwLock<SessionState>,
pub(crate) id: u16,
owns_runtime: bool,
Expand Down Expand Up @@ -686,11 +727,15 @@ impl Session {
let publisher_qos = config.0.qos().publication().clone();
let namespace = config.0.namespace().clone();
drop(config);
let state = RwLock::new(SessionState::new(
let state = SessionState::new(
aggregated_subscribers,
aggregated_publishers,
publisher_qos.into(),
));
);
#[cfg(feature = "test")]
let state = InvalidatableSessionState::new(state);
let state = RwLock::new(state);

let session = Session(Arc::new(SessionInner {
weak_counter: Mutex::new(0),
runtime: runtime.clone(),
Expand Down Expand Up @@ -3185,27 +3230,33 @@ impl Closee for Arc<SessionInner> {
primitives.send_close();
}

// defer the cleanup of internal data structures by taking them out of the locked state
// this is needed because callbacks may contain entities which need to acquire the
// lock to be dropped, so callback must be dropped without the lock held
let mut state = zwrite!(self.state);
let _queryables = std::mem::take(&mut state.queryables);
let _subscribers = std::mem::take(&mut state.subscribers);
let _liveliness_subscribers = std::mem::take(&mut state.liveliness_subscribers);
let _local_resources = std::mem::take(&mut state.local_resources);
let _remote_resources = std::mem::take(&mut state.remote_resources);
drop(state);
#[cfg(feature = "unstable")]
{
// the lock from the outer scope cannot be reused because the declared variables
// would be undeclared at the end of the block, with the lock held, and we want
// to avoid that; so we reacquire the lock in the block
// anyway, it doesn't really matter, and this code will be cleaned up when the APIs
// will be stabilized.
// defer the cleanup of internal data structures by taking them out of the locked state
// this is needed because callbacks may contain entities which need to acquire the
// lock to be dropped, so callback must be dropped without the lock held
let mut state = zwrite!(self.state);
let _matching_listeners = std::mem::take(&mut state.matching_listeners);
let _queryables = std::mem::take(&mut state.queryables);
let _subscribers = std::mem::take(&mut state.subscribers);
let _liveliness_subscribers = std::mem::take(&mut state.liveliness_subscribers);
let _local_resources = std::mem::take(&mut state.local_resources);
let _remote_resources = std::mem::take(&mut state.remote_resources);
let _queries = std::mem::take(&mut state.queries);
drop(state);
#[cfg(feature = "unstable")]
{
// the lock from the outer scope cannot be reused because the declared variables
// would be undeclared at the end of the block, with the lock held, and we want
// to avoid that; so we reacquire the lock in the block
// anyway, it doesn't really matter, and this code will be cleaned up when the APIs
// will be stabilized.
let mut state = zwrite!(self.state);
let _queriers = std::mem::take(&mut state.queriers);
let _matching_listeners = std::mem::take(&mut state.matching_listeners);
drop(state);
}
}
#[cfg(feature = "test")]
zwrite!(self.state).invalidate();
}
}

Expand Down
Loading