From 2806cefdc50fcb6fb6b52f178dc4a23099af4ae1 Mon Sep 17 00:00:00 2001 From: Zhang Yanpo Date: Wed, 9 Apr 2025 14:41:19 +0800 Subject: [PATCH 1/3] refactor: add Display impl; debug() when being dropped --- src/desc.rs | 74 +++++++++++++++++++++++++++++++ src/dispatch/dispatcher.rs | 26 ++++++++--- src/dispatch/dispatcher_handle.rs | 3 +- src/lib.rs | 3 ++ src/testing.rs | 49 ++++++++++++++++++++ src/watch_stream/sender.rs | 20 ++++++++- src/watch_stream/stream.rs | 2 + tests/it/dispatcher.rs | 35 +++++++++++---- 8 files changed, 197 insertions(+), 15 deletions(-) create mode 100644 src/testing.rs diff --git a/src/desc.rs b/src/desc.rs index f0f1869..e8760d0 100644 --- a/src/desc.rs +++ b/src/desc.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::Bound; +use std::fmt; + use crate::event_filter::EventFilter; use crate::id::WatcherId; use crate::type_config::TypeConfig; @@ -49,3 +52,74 @@ where C: TypeConfig } } } + +impl fmt::Display for WatchDesc +where + C: TypeConfig, + C::Key: fmt::Display, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "(id:{} {} ", self.watcher_id, self.interested)?; + + match &self.key_range.0 { + Bound::Included(v) => { + write!(f, "[{}", v)?; + } + Bound::Excluded(v) => { + write!(f, "({}", v)?; + } + Bound::Unbounded => { + write!(f, "(-∞")?; + } + } + + match &self.key_range.1 { + Bound::Included(v) => { + write!(f, ", {}]", v)?; + } + Bound::Excluded(v) => { + write!(f, ", {})", v)?; + } + Bound::Unbounded => { + write!(f, ", +∞)")?; + } + } + + write!(f, ")") + } +} + +#[cfg(test)] +mod tests { + use std::ops::Bound; + + use super::*; + use crate::testing::UTTypes; + + #[test] + fn test_watch_desc_display() { + let desc = WatchDesc::::new( + 1, + EventFilter::all(), + ( + Bound::Included("a".to_string()), + Bound::Excluded("z".to_string()), + ), + ); + assert_eq!(format!("{}", desc), r#"(id:1 update|delete [a, z))"#); + + let desc = WatchDesc::::new( + 1, + EventFilter::delete(), + (Bound::Excluded("a".to_string()), Bound::Unbounded), + ); + assert_eq!(format!("{}", desc), r#"(id:1 delete (a, +∞))"#); + + let desc = WatchDesc::::new( + 1, + EventFilter::update(), + (Bound::Unbounded, Bound::Included("a".to_string())), + ); + assert_eq!(format!("{}", desc), r#"(id:1 update (-∞, a])"#); + } +} diff --git a/src/dispatch/dispatcher.rs b/src/dispatch/dispatcher.rs index 96a7875..e9876de 100644 --- a/src/dispatch/dispatcher.rs +++ b/src/dispatch/dispatcher.rs @@ -14,7 +14,9 @@ use std::collections::BTreeSet; use std::sync::Arc; +use std::sync::Weak; +use log::debug; use log::info; use log::warn; use span_map::SpanMap; @@ -47,6 +49,14 @@ where C: TypeConfig current_watcher_id: WatcherId, } +impl Drop for Dispatcher +where C: TypeConfig +{ + fn drop(&mut self) { + debug!("watch-event-Dispatcher is dropped"); + } +} + impl Dispatcher where C: TypeConfig { @@ -83,7 +93,7 @@ where C: TypeConfig } } - info!("watch-event-Dispatcher: all event senders are closed. quit."); + info!("watch-event-Dispatcher: all event senders are closed(dropped). quit"); } /// Dispatch a kv change event to interested watchers. @@ -97,7 +107,13 @@ where C: TypeConfig let mut removed = vec![]; + debug!("watch-event-Dispatcher: dispatch event {:?}", kv_change); + for sender in self.watchers.get(&kv_change.0) { + debug!( + "watch-event-Dispatcher: dispatch event to watcher {:?}", + sender + ); let interested = sender.desc.interested; if !interested.accepts_event_type(event_type) { @@ -107,8 +123,8 @@ where C: TypeConfig let resp = C::new_response(kv_change.clone()); if let Err(_err) = sender.send(resp).await { warn!( - "watch-event-Dispatcher: fail to send to watcher {}; close this stream", - sender.desc.watcher_id + "watch-event-Dispatcher: fail to send to watcher {:?}; close this stream", + sender ); removed.push(sender.clone()); }; @@ -124,7 +140,7 @@ where C: TypeConfig rng: KeyRange, filter: EventFilter, tx: mpsc::Sender>, - ) -> Arc> { + ) -> Weak> { info!( "watch-event-Dispatcher::add_watcher: range: {:?}, filter: {}", rng, filter @@ -139,7 +155,7 @@ where C: TypeConfig C::update_watcher_metrics(1); - stream_sender + Arc::downgrade(&stream_sender) } fn new_watch_desc(&mut self, key_range: KeyRange, interested: EventFilter) -> WatchDesc { diff --git a/src/dispatch/dispatcher_handle.rs b/src/dispatch/dispatcher_handle.rs index a29925c..fb48c3f 100644 --- a/src/dispatch/dispatcher_handle.rs +++ b/src/dispatch/dispatcher_handle.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::sync::Arc; +use std::sync::Weak; use log::error; use tokio::sync::mpsc; @@ -65,7 +66,7 @@ where C: TypeConfig key_range: KeyRange, filter: EventFilter, tx: mpsc::Sender>, - ) -> Result>, &'static str> { + ) -> Result>, &'static str> { self.request_blocking(move |dispatcher| dispatcher.add_watcher(key_range, filter, tx)) .await .map_err(|_| "Failed to add watcher; watch-Dispatcher may be closed") diff --git a/src/lib.rs b/src/lib.rs index 666d22d..1989185 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,6 +23,9 @@ pub mod type_config; pub mod util; pub mod watch_stream; +#[cfg(test)] +pub(crate) mod testing; + use std::collections::Bound; pub use event_filter::EventFilter; diff --git a/src/testing.rs b/src/testing.rs new file mode 100644 index 0000000..1788ced --- /dev/null +++ b/src/testing.rs @@ -0,0 +1,49 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::future::Future; +use std::io; + +use crate::type_config::KVChange; +use crate::type_config::TypeConfig; + +// Only Debug is actually needed for the test framework +#[derive(Debug, Copy, Clone)] +pub(crate) struct UTTypes {} + +impl TypeConfig for UTTypes { + type Key = String; + type Value = String; + type Response = (String, Option, Option); + type Error = io::Error; + + fn new_response(change: KVChange) -> Self::Response { + change + } + + fn data_error(error: io::Error) -> Self::Error { + error + } + + fn update_watcher_metrics(_delta: i64) {} + + #[allow(clippy::disallowed_methods)] + fn spawn(_fut: T) + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + todo!() + } +} diff --git a/src/watch_stream/sender.rs b/src/watch_stream/sender.rs index 6403011..6374099 100644 --- a/src/watch_stream/sender.rs +++ b/src/watch_stream/sender.rs @@ -16,6 +16,7 @@ use std::cmp::Ordering; use std::fmt; use std::fmt::Formatter; +use log::debug; use tokio::sync::mpsc; use tokio::sync::mpsc::error::SendError; @@ -28,7 +29,6 @@ use crate::WatchResult; /// The stream sender is responsible for sending watch events through the stream /// to the client-side watcher. It encapsulates the communication channel between /// the server's event source and the client's watch request. -#[derive(Clone)] pub struct WatchStreamSender where C: TypeConfig { @@ -36,6 +36,14 @@ where C: TypeConfig tx: mpsc::Sender>, } +impl Drop for WatchStreamSender +where C: TypeConfig +{ + fn drop(&mut self) { + debug!("WatchStreamSender({:?}) dropped", self.desc,); + } +} + impl fmt::Debug for WatchStreamSender where C: TypeConfig { @@ -44,6 +52,16 @@ where C: TypeConfig } } +impl fmt::Display for WatchStreamSender +where + C: TypeConfig, + C::Key: fmt::Display, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "WatchStreamSender({})", self.desc) + } +} + impl PartialEq for WatchStreamSender where C: TypeConfig { diff --git a/src/watch_stream/stream.rs b/src/watch_stream/stream.rs index 2199988..eafd762 100644 --- a/src/watch_stream/stream.rs +++ b/src/watch_stream/stream.rs @@ -18,6 +18,7 @@ use std::task::Context; use std::task::Poll; use futures::Stream; +use log::debug; use tokio::sync::mpsc::Receiver; /// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`]. @@ -35,6 +36,7 @@ impl fmt::Debug for WatchStream { impl Drop for WatchStream { fn drop(&mut self) { + debug!("WatchStream is dropped"); let Some(on_drop) = self.on_drop.take() else { return; }; diff --git a/tests/it/dispatcher.rs b/tests/it/dispatcher.rs index d367ae1..5a7c1d1 100644 --- a/tests/it/dispatcher.rs +++ b/tests/it/dispatcher.rs @@ -177,10 +177,16 @@ async fn test_dispatcher_overlapping_ranges() { .await; let watcher1_clone = watcher1.clone(); - handle.remove_watcher(watcher1_clone).await.unwrap(); + handle + .remove_watcher(watcher1_clone.upgrade().unwrap()) + .await + .unwrap(); let watcher2_clone = watcher2.clone(); - handle.remove_watcher(watcher2_clone).await.unwrap(); + handle + .remove_watcher(watcher2_clone.upgrade().unwrap()) + .await + .unwrap(); } #[tokio::test] @@ -229,8 +235,12 @@ async fn test_dispatcher_basic_functionality() { expect_no_event(&mut rx2, "w2 should not receive (filter excludes DELETE)").await; // Remove first watcher + let watcher1_strong = watcher1.upgrade().unwrap(); let watcher1_clone = watcher1.clone(); - handle.remove_watcher(watcher1_clone).await.unwrap(); + handle + .remove_watcher(watcher1_clone.upgrade().unwrap()) + .await + .unwrap(); tokio::time::sleep(REMOVAL_DELAY).await; drop(rx1); @@ -241,11 +251,17 @@ async fn test_dispatcher_basic_functionality() { // Verify watcher status let senders = all_senders(&handle).await; assert_eq!(senders.len(), 1); - assert!(!senders.contains(&watcher1), "w1 should be removed"); - assert!(senders.contains(&watcher2), "w2 should still exist"); + assert!(!senders.contains(&watcher1_strong), "w1 should be removed"); + assert!( + senders.contains(&watcher2.upgrade().unwrap()), + "w2 should still exist" + ); let watcher2_clone = watcher2.clone(); - handle.remove_watcher(watcher2_clone).await.unwrap(); + handle + .remove_watcher(watcher2_clone.upgrade().unwrap()) + .await + .unwrap(); } #[tokio::test] @@ -272,13 +288,16 @@ async fn test_dispatcher_watch_senders() { let senders = all_senders(&handle).await; assert_eq!(senders.len(), 3); for watcher in &watchers { - assert!(senders.contains(watcher)); + assert!(senders.contains(&watcher.upgrade().unwrap())); } // Clean up for watcher in watchers.clone() { let watcher_clone = watcher.clone(); - handle.remove_watcher(watcher_clone).await.unwrap(); + handle + .remove_watcher(watcher_clone.upgrade().unwrap()) + .await + .unwrap(); } let senders = all_senders(&handle).await; From 73381344c4dd5871bde726dc5d83e9667572e5da Mon Sep 17 00:00:00 2001 From: Zhang Yanpo Date: Wed, 9 Apr 2025 14:41:48 +0800 Subject: [PATCH 2/3] chore: Bump ver: v0.1.1 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index a45967b..0bb5bd5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "watcher" description = "subscribe data changes" -version = "0.1.0" +version = "0.1.1" authors = ["Databend Authors "] license = "Apache-2.0" edition = "2021" From de30194284b8382f4a2727ebdb7fbd4dd53e39b1 Mon Sep 17 00:00:00 2001 From: Zhang Yanpo Date: Wed, 9 Apr 2025 14:47:03 +0800 Subject: [PATCH 3/3] chore: silent warn about mutable key type --- tests/it/main.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/it/main.rs b/tests/it/main.rs index bfb82a9..ed4ef16 100644 --- a/tests/it/main.rs +++ b/tests/it/main.rs @@ -12,4 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![allow(clippy::mutable_key_type)] + mod dispatcher;