Skip to content

Update memdx channel usage to use callbacks where possible #168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/couchbase-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ async-trait = "0.1.80"
tokio-io = { version = "0.2.0-alpha.6", features = ["util"] }
crc32fast = "1.4.2"
serde_json = "1.0.120"
arc-swap = "1.7"

[dev-dependencies]
env_logger = "0.11"
Expand Down
21 changes: 2 additions & 19 deletions sdk/couchbase-core/src/configwatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ mod tests {
use std::time::Duration;

use tokio::sync::broadcast;
use tokio::sync::mpsc::unbounded_channel;
use tokio::time::sleep;

use crate::authenticator::PasswordAuthenticator;
Expand All @@ -213,9 +212,8 @@ mod tests {
};
use crate::kvclientpool::NaiveKvClientPool;
use crate::memdx::client::Client;
use crate::memdx::packet::ResponsePacket;

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
async fn fetches_configs() {
let client_config = KvClientConfig {
address: "192.168.107.128:11210"
Expand Down Expand Up @@ -245,28 +243,13 @@ mod tests {
clients: client_configs,
};

let (orphan_tx, mut orphan_rx) = unbounded_channel::<ResponsePacket>();

tokio::spawn(async move {
loop {
match orphan_rx.recv().await {
Some(resp) => {
dbg!("unexpected orphan", resp);
}
None => {
return;
}
}
}
});

let manager: StdKvClientManager<NaiveKvClientPool<StdKvClient<Client>>> =
StdKvClientManager::new(
manger_config,
KvClientManagerOptions {
connect_timeout: Default::default(),
connect_throttle_period: Default::default(),
orphan_handler: Arc::new(orphan_tx),
orphan_handler: Arc::new(|_| {}),
},
)
.await
Expand Down
19 changes: 1 addition & 18 deletions sdk/couchbase-core/src/crudcomponent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ mod tests {
use std::sync::Arc;
use std::time::Duration;

use tokio::sync::mpsc::unbounded_channel;
use tokio::time::Instant;

use crate::authenticator::PasswordAuthenticator;
Expand All @@ -151,7 +150,6 @@ mod tests {
};
use crate::kvclientpool::NaiveKvClientPool;
use crate::memdx::client::Client;
use crate::memdx::packet::ResponsePacket;
use crate::vbucketmap::VbucketMap;
use crate::vbucketrouter::{
NotMyVbucketConfigHandler, StdVbucketRouter, VbucketRouterOptions, VbucketRoutingInfo,
Expand All @@ -171,21 +169,6 @@ mod tests {

let instant = Instant::now().add(Duration::new(7, 0));

let (orphan_tx, mut orphan_rx) = unbounded_channel::<ResponsePacket>();

tokio::spawn(async move {
loop {
match orphan_rx.recv().await {
Some(resp) => {
dbg!("unexpected orphan", resp);
}
None => {
return;
}
}
}
});

let client_config = KvClientConfig {
address: "192.168.107.128:11210"
.parse()
Expand Down Expand Up @@ -220,7 +203,7 @@ mod tests {
KvClientManagerOptions {
connect_timeout: Default::default(),
connect_throttle_period: Default::default(),
orphan_handler: Arc::new(orphan_tx),
orphan_handler: Arc::new(|_| {}),
},
)
.await
Expand Down
78 changes: 31 additions & 47 deletions sdk/couchbase-core/src/kvclient.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::future::Future;
use std::net::SocketAddr;
use std::ops::{Add, Deref};
use std::ops::{Add, AsyncFn, Deref};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;

use tokio::sync::{Mutex, oneshot};
use tokio::sync::mpsc::UnboundedSender;
use futures::future::BoxFuture;
use tokio::sync::Mutex;
use tokio::time::Instant;
use tokio_rustls::rustls::RootCertStore;
use uuid::Uuid;
Expand All @@ -16,11 +16,10 @@ use crate::error::Error;
use crate::error::Result;
use crate::memdx::auth_mechanism::AuthMechanism;
use crate::memdx::connection::{Connection, ConnectOptions};
use crate::memdx::dispatcher::{Dispatcher, DispatcherOptions};
use crate::memdx::dispatcher::{Dispatcher, DispatcherOptions, OrphanResponseHandler};
use crate::memdx::hello_feature::HelloFeature;
use crate::memdx::op_auth_saslauto::SASLAuthAutoOptions;
use crate::memdx::op_bootstrap::BootstrapOptions;
use crate::memdx::packet::ResponsePacket;
use crate::memdx::request::{GetErrorMapRequest, HelloRequest, SelectBucketRequest};
use crate::service_type::ServiceType;

Expand Down Expand Up @@ -53,9 +52,12 @@ impl PartialEq for KvClientConfig {
}
}

pub(crate) type OnKvClientCloseHandler =
Arc<dyn Fn(String) -> BoxFuture<'static, ()> + Send + Sync>;

pub(crate) struct KvClientOptions {
pub orphan_handler: Arc<UnboundedSender<ResponsePacket>>,
pub on_close_tx: Option<UnboundedSender<String>>,
pub orphan_handler: OrphanResponseHandler,
pub on_close: OnKvClientCloseHandler,
}

pub(crate) trait KvClient: Sized + PartialEq + Send + Sync {
Expand Down Expand Up @@ -178,16 +180,26 @@ where
));
}

let (connection_close_tx, mut connection_close_rx) =
oneshot::channel::<crate::memdx::error::Result<()>>();
let closed = Arc::new(AtomicBool::new(false));
let closed_clone = closed.clone();
let id = Uuid::new_v4().to_string();
let read_id = id.clone();

let on_close = opts.on_close.clone();
let memdx_client_opts = DispatcherOptions {
on_connection_close_handler: Some(connection_close_tx),
on_connection_close_handler: Arc::new(move || {
// There's not much to do when the connection closes so just mark us as closed.
closed_clone.store(true, Ordering::SeqCst);
let on_close = on_close.clone();
let read_id = read_id.clone();

Box::pin(async move {
on_close(read_id).await;
})
}),
orphan_handler: opts.orphan_handler,
};

let closed = Arc::new(AtomicBool::new(false));
let closed_clone = closed.clone();

let conn = Connection::connect(
config.address,
ConnectOptions {
Expand All @@ -205,7 +217,6 @@ where
let local_addr = *conn.local_addr();

let mut cli = D::new(conn, memdx_client_opts);
let id = Uuid::new_v4().to_string();

let mut kv_cli = StdKvClient {
remote_addr,
Expand All @@ -218,18 +229,6 @@ where
id: id.clone(),
};

tokio::spawn(async move {
// There's not much to do when the connection closes so just mark us as closed.
if connection_close_rx.await.is_ok() {
closed_clone.store(true, Ordering::SeqCst);
};

if let Some(mut tx) = opts.on_close_tx {
// TODO: Probably log on failure.
tx.send(id).unwrap_or_default();
}
});

if should_bootstrap {
if let Some(b) = &bootstrap_select_bucket {
let mut guard = kv_cli.selected_bucket.lock().await;
Expand Down Expand Up @@ -309,7 +308,7 @@ where
.await
{
Ok(_) => {}
Err(e) => {
Err(_e) => {
let mut current_bucket = self.selected_bucket.lock().await;
*current_bucket = None;
drop(current_bucket);
Expand Down Expand Up @@ -362,37 +361,20 @@ mod tests {
use std::sync::Arc;
use std::time::Duration;

use tokio::sync::mpsc::unbounded_channel;
use tokio::time::Instant;

use crate::authenticator::PasswordAuthenticator;
use crate::kvclient::{KvClient, KvClientConfig, KvClientOptions, StdKvClient};
use crate::kvclient_ops::KvClientOps;
use crate::memdx::client::Client;
use crate::memdx::packet::ResponsePacket;
use crate::memdx::request::{GetRequest, SetRequest};

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
async fn roundtrip_a_request() {
let _ = env_logger::try_init();

let instant = Instant::now().add(Duration::new(7, 0));

let (orphan_tx, mut orphan_rx) = unbounded_channel::<ResponsePacket>();

tokio::spawn(async move {
loop {
match orphan_rx.recv().await {
Some(resp) => {
dbg!("unexpected orphan", resp);
}
None => {
return;
}
}
}
});

let client_config = KvClientConfig {
address: "192.168.107.128:11210"
.parse()
Expand All @@ -416,8 +398,10 @@ mod tests {
let mut client = StdKvClient::<Client>::new(
client_config,
KvClientOptions {
orphan_handler: Arc::new(orphan_tx),
on_close_tx: None,
orphan_handler: Arc::new(|packet| {
dbg!("unexpected orphan", packet);
}),
on_close: Arc::new(|id| Box::pin(async {})),
},
)
.await
Expand Down
7 changes: 3 additions & 4 deletions sdk/couchbase-core/src/kvclientmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ use std::future::Future;
use std::sync::Arc;
use std::time::Duration;

use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::Mutex;

use crate::error::ErrorKind;
use crate::error::Result;
use crate::kvclient::{KvClient, KvClientConfig};
use crate::kvclient_ops::KvClientOps;
use crate::kvclientpool::{KvClientPool, KvClientPoolConfig, KvClientPoolOptions};
use crate::memdx::packet::ResponsePacket;
use crate::memdx::dispatcher::OrphanResponseHandler;

pub(crate) type KvClientManagerClientType<M> =
<<M as KvClientManager>::Pool as KvClientPool>::Client;
Expand Down Expand Up @@ -46,11 +45,11 @@ pub(crate) struct KvClientManagerConfig {
pub clients: HashMap<String, KvClientConfig>,
}

#[derive(Debug, Clone)]
#[derive(Clone)]
pub(crate) struct KvClientManagerOptions {
pub connect_timeout: Duration,
pub connect_throttle_period: Duration,
pub orphan_handler: Arc<UnboundedSender<ResponsePacket>>,
pub orphan_handler: OrphanResponseHandler,
}

#[derive(Debug)]
Expand Down
Loading
Loading