Skip to content

Commit 2829254

Browse files
committed
Update memdx channel usage to use callbacks where possible
Motivation ----------- Using channels for things like orphan handler requires us to spin up new tasks, the flow is also a bit easier to follow with callbacks. Changes ------- Update orphan handling and connection close handling to use callbacks. Rewrite kvclientpool to make this possible.
1 parent a9462a0 commit 2829254

File tree

9 files changed

+280
-345
lines changed

9 files changed

+280
-345
lines changed

sdk/couchbase-core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ async-trait = "0.1.80"
2525
tokio-io = { version = "0.2.0-alpha.6", features = ["util"] }
2626
crc32fast = "1.4.2"
2727
serde_json = "1.0.120"
28+
arc-swap = "1.7"
2829

2930
[dev-dependencies]
3031
env_logger = "0.11"

sdk/couchbase-core/src/configwatcher.rs

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,6 @@ mod tests {
200200
use std::time::Duration;
201201

202202
use tokio::sync::broadcast;
203-
use tokio::sync::mpsc::unbounded_channel;
204203
use tokio::time::sleep;
205204

206205
use crate::authenticator::PasswordAuthenticator;
@@ -213,7 +212,6 @@ mod tests {
213212
};
214213
use crate::kvclientpool::NaiveKvClientPool;
215214
use crate::memdx::client::Client;
216-
use crate::memdx::packet::ResponsePacket;
217215

218216
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
219217
async fn fetches_configs() {
@@ -245,28 +243,13 @@ mod tests {
245243
clients: client_configs,
246244
};
247245

248-
let (orphan_tx, mut orphan_rx) = unbounded_channel::<ResponsePacket>();
249-
250-
tokio::spawn(async move {
251-
loop {
252-
match orphan_rx.recv().await {
253-
Some(resp) => {
254-
dbg!("unexpected orphan", resp);
255-
}
256-
None => {
257-
return;
258-
}
259-
}
260-
}
261-
});
262-
263246
let manager: StdKvClientManager<NaiveKvClientPool<StdKvClient<Client>>> =
264247
StdKvClientManager::new(
265248
manger_config,
266249
KvClientManagerOptions {
267250
connect_timeout: Default::default(),
268251
connect_throttle_period: Default::default(),
269-
orphan_handler: Arc::new(orphan_tx),
252+
orphan_handler: Arc::new(|_| {}),
270253
},
271254
)
272255
.await

sdk/couchbase-core/src/crudcomponent.rs

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,6 @@ mod tests {
138138
use std::sync::Arc;
139139
use std::time::Duration;
140140

141-
use tokio::sync::mpsc::unbounded_channel;
142141
use tokio::time::Instant;
143142

144143
use crate::authenticator::PasswordAuthenticator;
@@ -151,7 +150,6 @@ mod tests {
151150
};
152151
use crate::kvclientpool::NaiveKvClientPool;
153152
use crate::memdx::client::Client;
154-
use crate::memdx::packet::ResponsePacket;
155153
use crate::vbucketmap::VbucketMap;
156154
use crate::vbucketrouter::{
157155
NotMyVbucketConfigHandler, StdVbucketRouter, VbucketRouterOptions, VbucketRoutingInfo,
@@ -171,21 +169,6 @@ mod tests {
171169

172170
let instant = Instant::now().add(Duration::new(7, 0));
173171

174-
let (orphan_tx, mut orphan_rx) = unbounded_channel::<ResponsePacket>();
175-
176-
tokio::spawn(async move {
177-
loop {
178-
match orphan_rx.recv().await {
179-
Some(resp) => {
180-
dbg!("unexpected orphan", resp);
181-
}
182-
None => {
183-
return;
184-
}
185-
}
186-
}
187-
});
188-
189172
let client_config = KvClientConfig {
190173
address: "192.168.107.128:11210"
191174
.parse()
@@ -220,7 +203,7 @@ mod tests {
220203
KvClientManagerOptions {
221204
connect_timeout: Default::default(),
222205
connect_throttle_period: Default::default(),
223-
orphan_handler: Arc::new(orphan_tx),
206+
orphan_handler: Arc::new(|_| {}),
224207
},
225208
)
226209
.await

sdk/couchbase-core/src/kvclient.rs

Lines changed: 30 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
use std::future::Future;
22
use std::net::SocketAddr;
3-
use std::ops::{Add, Deref};
3+
use std::ops::{Add, AsyncFn, Deref};
44
use std::sync::Arc;
55
use std::sync::atomic::{AtomicBool, Ordering};
66
use std::time::Duration;
77

8-
use tokio::sync::{Mutex, oneshot};
9-
use tokio::sync::mpsc::UnboundedSender;
8+
use futures::future::BoxFuture;
9+
use tokio::sync::Mutex;
1010
use tokio::time::Instant;
1111
use tokio_rustls::rustls::RootCertStore;
1212
use uuid::Uuid;
@@ -16,11 +16,10 @@ use crate::error::Error;
1616
use crate::error::Result;
1717
use crate::memdx::auth_mechanism::AuthMechanism;
1818
use crate::memdx::connection::{Connection, ConnectOptions};
19-
use crate::memdx::dispatcher::{Dispatcher, DispatcherOptions};
19+
use crate::memdx::dispatcher::{Dispatcher, DispatcherOptions, OrphanResponseHandler};
2020
use crate::memdx::hello_feature::HelloFeature;
2121
use crate::memdx::op_auth_saslauto::SASLAuthAutoOptions;
2222
use crate::memdx::op_bootstrap::BootstrapOptions;
23-
use crate::memdx::packet::ResponsePacket;
2423
use crate::memdx::request::{GetErrorMapRequest, HelloRequest, SelectBucketRequest};
2524
use crate::service_type::ServiceType;
2625

@@ -53,9 +52,12 @@ impl PartialEq for KvClientConfig {
5352
}
5453
}
5554

55+
pub(crate) type OnKvClientCloseHandler =
56+
Arc<dyn Fn(String) -> BoxFuture<'static, ()> + Send + Sync>;
57+
5658
pub(crate) struct KvClientOptions {
57-
pub orphan_handler: Arc<UnboundedSender<ResponsePacket>>,
58-
pub on_close_tx: Option<UnboundedSender<String>>,
59+
pub orphan_handler: OrphanResponseHandler,
60+
pub on_close: OnKvClientCloseHandler,
5961
}
6062

6163
pub(crate) trait KvClient: Sized + PartialEq + Send + Sync {
@@ -178,16 +180,26 @@ where
178180
));
179181
}
180182

181-
let (connection_close_tx, mut connection_close_rx) =
182-
oneshot::channel::<crate::memdx::error::Result<()>>();
183+
let closed = Arc::new(AtomicBool::new(false));
184+
let closed_clone = closed.clone();
185+
let id = Uuid::new_v4().to_string();
186+
let read_id = id.clone();
187+
188+
let on_close = opts.on_close.clone();
183189
let memdx_client_opts = DispatcherOptions {
184-
on_connection_close_handler: Some(connection_close_tx),
190+
on_connection_close_handler: Arc::new(move || {
191+
// There's not much to do when the connection closes so just mark us as closed.
192+
closed_clone.store(true, Ordering::SeqCst);
193+
let on_close = on_close.clone();
194+
let read_id = read_id.clone();
195+
196+
Box::pin(async move {
197+
on_close(read_id).await;
198+
})
199+
}),
185200
orphan_handler: opts.orphan_handler,
186201
};
187202

188-
let closed = Arc::new(AtomicBool::new(false));
189-
let closed_clone = closed.clone();
190-
191203
let conn = Connection::connect(
192204
config.address,
193205
ConnectOptions {
@@ -205,7 +217,6 @@ where
205217
let local_addr = *conn.local_addr();
206218

207219
let mut cli = D::new(conn, memdx_client_opts);
208-
let id = Uuid::new_v4().to_string();
209220

210221
let mut kv_cli = StdKvClient {
211222
remote_addr,
@@ -218,18 +229,6 @@ where
218229
id: id.clone(),
219230
};
220231

221-
tokio::spawn(async move {
222-
// There's not much to do when the connection closes so just mark us as closed.
223-
if connection_close_rx.await.is_ok() {
224-
closed_clone.store(true, Ordering::SeqCst);
225-
};
226-
227-
if let Some(mut tx) = opts.on_close_tx {
228-
// TODO: Probably log on failure.
229-
tx.send(id).unwrap_or_default();
230-
}
231-
});
232-
233232
if should_bootstrap {
234233
if let Some(b) = &bootstrap_select_bucket {
235234
let mut guard = kv_cli.selected_bucket.lock().await;
@@ -309,7 +308,7 @@ where
309308
.await
310309
{
311310
Ok(_) => {}
312-
Err(e) => {
311+
Err(_e) => {
313312
let mut current_bucket = self.selected_bucket.lock().await;
314313
*current_bucket = None;
315314
drop(current_bucket);
@@ -362,14 +361,12 @@ mod tests {
362361
use std::sync::Arc;
363362
use std::time::Duration;
364363

365-
use tokio::sync::mpsc::unbounded_channel;
366364
use tokio::time::Instant;
367365

368366
use crate::authenticator::PasswordAuthenticator;
369367
use crate::kvclient::{KvClient, KvClientConfig, KvClientOptions, StdKvClient};
370368
use crate::kvclient_ops::KvClientOps;
371369
use crate::memdx::client::Client;
372-
use crate::memdx::packet::ResponsePacket;
373370
use crate::memdx::request::{GetRequest, SetRequest};
374371

375372
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -378,21 +375,6 @@ mod tests {
378375

379376
let instant = Instant::now().add(Duration::new(7, 0));
380377

381-
let (orphan_tx, mut orphan_rx) = unbounded_channel::<ResponsePacket>();
382-
383-
tokio::spawn(async move {
384-
loop {
385-
match orphan_rx.recv().await {
386-
Some(resp) => {
387-
dbg!("unexpected orphan", resp);
388-
}
389-
None => {
390-
return;
391-
}
392-
}
393-
}
394-
});
395-
396378
let client_config = KvClientConfig {
397379
address: "192.168.107.128:11210"
398380
.parse()
@@ -416,8 +398,10 @@ mod tests {
416398
let mut client = StdKvClient::<Client>::new(
417399
client_config,
418400
KvClientOptions {
419-
orphan_handler: Arc::new(orphan_tx),
420-
on_close_tx: None,
401+
orphan_handler: Arc::new(|packet| {
402+
dbg!("unexpected orphan", packet);
403+
}),
404+
on_close: Arc::new(|id| Box::pin(async {})),
421405
},
422406
)
423407
.await

sdk/couchbase-core/src/kvclientmanager.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,14 @@ use std::future::Future;
33
use std::sync::Arc;
44
use std::time::Duration;
55

6-
use tokio::sync::mpsc::UnboundedSender;
76
use tokio::sync::Mutex;
87

98
use crate::error::ErrorKind;
109
use crate::error::Result;
1110
use crate::kvclient::{KvClient, KvClientConfig};
1211
use crate::kvclient_ops::KvClientOps;
1312
use crate::kvclientpool::{KvClientPool, KvClientPoolConfig, KvClientPoolOptions};
14-
use crate::memdx::packet::ResponsePacket;
13+
use crate::memdx::dispatcher::OrphanResponseHandler;
1514

1615
pub(crate) type KvClientManagerClientType<M> =
1716
<<M as KvClientManager>::Pool as KvClientPool>::Client;
@@ -46,11 +45,11 @@ pub(crate) struct KvClientManagerConfig {
4645
pub clients: HashMap<String, KvClientConfig>,
4746
}
4847

49-
#[derive(Debug, Clone)]
48+
#[derive(Clone)]
5049
pub(crate) struct KvClientManagerOptions {
5150
pub connect_timeout: Duration,
5251
pub connect_throttle_period: Duration,
53-
pub orphan_handler: Arc<UnboundedSender<ResponsePacket>>,
52+
pub orphan_handler: OrphanResponseHandler,
5453
}
5554

5655
#[derive(Debug)]

0 commit comments

Comments
 (0)