Skip to content

Commit a9462a0

Browse files
committed
Rework operation cancellation
Motivation ---------- Rather than reading from a channel for cancellation we can just pass around references to the opaque and operations can remove themselves on cancellation. Changes -------- Add opaque map to client pending op and pass in a clone during dispatch. Remove the cancellation sender/receiver from client.
1 parent 18b51dd commit a9462a0

File tree

2 files changed

+36
-56
lines changed

2 files changed

+36
-56
lines changed

sdk/couchbase-core/src/memdx/client.rs

Lines changed: 5 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,8 @@ use crate::memdx::error::{CancellationErrorKind, Error, ErrorKind};
3131
use crate::memdx::packet::{RequestPacket, ResponsePacket};
3232
use crate::memdx::pendingop::ClientPendingOp;
3333

34-
type ResponseSender = Sender<error::Result<ClientResponse>>;
35-
type OpaqueMap = HashMap<u32, Arc<ResponseSender>>;
36-
pub(crate) type CancellationSender = UnboundedSender<(u32, CancellationErrorKind)>;
34+
pub(crate) type ResponseSender = Sender<error::Result<ClientResponse>>;
35+
pub(crate) type OpaqueMap = HashMap<u32, Arc<ResponseSender>>;
3736

3837
#[derive(Debug)]
3938
struct ReadLoopOptions {
@@ -65,8 +64,6 @@ pub struct Client {
6564

6665
writer: Mutex<FramedWrite<WriteHalf<TcpStream>, KeyValueCodec>>,
6766
read_handle: Mutex<ClientReadHandle>,
68-
69-
cancel_tx: CancellationSender,
7067
close_tx: Sender<()>,
7168

7269
local_addr: Option<SocketAddr>,
@@ -101,12 +98,10 @@ impl Client {
10198

10299
async fn on_read_loop_close(
103100
stream: FramedRead<ReadHalf<TcpStream>, KeyValueCodec>,
104-
op_cancel_rx: UnboundedReceiver<(u32, CancellationErrorKind)>,
105101
opaque_map: MutexGuard<'_, OpaqueMap>,
106102
on_connection_close_tx: Option<oneshot::Sender<error::Result<()>>>,
107103
) {
108104
drop(stream);
109-
drop(op_cancel_rx);
110105

111106
Self::drain_opaque_map(opaque_map).await;
112107

@@ -117,44 +112,16 @@ impl Client {
117112

118113
async fn read_loop(
119114
mut stream: FramedRead<ReadHalf<TcpStream>, KeyValueCodec>,
120-
mut op_cancel_rx: UnboundedReceiver<(u32, CancellationErrorKind)>,
121115
opaque_map: Arc<Mutex<OpaqueMap>>,
122116
mut opts: ReadLoopOptions,
123117
) {
124118
loop {
125119
select! {
126120
(_) = opts.on_client_close_rx.recv() => {
127121
let guard = opaque_map.lock().await;
128-
Self::on_read_loop_close(stream, op_cancel_rx, guard, opts.on_connection_close_tx).await;
122+
Self::on_read_loop_close(stream, guard, opts.on_connection_close_tx).await;
129123
return;
130124
},
131-
(cancel_reason) = op_cancel_rx.recv() => {
132-
match cancel_reason {
133-
Some(cancel_info) => {
134-
let requests: Arc<Mutex<OpaqueMap>> = Arc::clone(&opaque_map);
135-
let mut map = requests.lock().await;
136-
137-
let t = map.remove(&cancel_info.0);
138-
139-
if let Some(map_entry) = t {
140-
let sender = Arc::clone(&map_entry);
141-
drop(map);
142-
143-
sender
144-
.send(Err(ErrorKind::Cancelled(cancel_info.1).into()))
145-
.await
146-
.unwrap();
147-
} else {
148-
drop(map);
149-
}
150-
151-
drop(requests);
152-
}
153-
None => {
154-
return;
155-
}
156-
}
157-
},
158125
(next) = stream.next() => {
159126
match next {
160127
Some(input) => {
@@ -230,7 +197,7 @@ impl Client {
230197
}
231198
None => {
232199
let guard = opaque_map.lock().await;
233-
Self::on_read_loop_close(stream, op_cancel_rx, guard, opts.on_connection_close_tx).await;
200+
Self::on_read_loop_close(stream, guard, opts.on_connection_close_tx).await;
234201
return;
235202
}
236203
}
@@ -260,7 +227,6 @@ impl Dispatcher for Client {
260227

261228
let uuid = Uuid::new_v4().to_string();
262229

263-
let (cancel_tx, cancel_rx) = mpsc::unbounded_channel();
264230
let (close_tx, close_rx) = mpsc::channel::<()>(1);
265231

266232
let opaque_map = Arc::new(Mutex::new(OpaqueMap::default()));
@@ -271,7 +237,6 @@ impl Dispatcher for Client {
271237
let read_handle = tokio::spawn(async move {
272238
Client::read_loop(
273239
reader,
274-
cancel_rx,
275240
read_opaque_map,
276241
ReadLoopOptions {
277242
client_id: read_uuid,
@@ -290,7 +255,6 @@ impl Dispatcher for Client {
290255
opaque_map,
291256
client_id: uuid,
292257

293-
cancel_tx,
294258
close_tx,
295259

296260
writer: Mutex::new(writer),
@@ -313,7 +277,7 @@ impl Dispatcher for Client {
313277
match writer.send(packet).await {
314278
Ok(_) => Ok(ClientPendingOp::new(
315279
opaque,
316-
self.cancel_tx.clone(),
280+
self.opaque_map.clone(),
317281
response_rx,
318282
)),
319283
Err(e) => {

sdk/couchbase-core/src/memdx/pendingop.rs

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
use std::future::Future;
22
use std::marker::PhantomData;
3-
use std::sync::atomic::AtomicBool;
3+
use std::sync::Arc;
44

5-
use log::debug;
6-
use tokio::select;
75
use tokio::sync::mpsc::Receiver;
6+
use tokio::sync::Mutex;
87
use tokio::time::{Instant, timeout_at};
98

10-
use crate::memdx::client::CancellationSender;
9+
use crate::memdx::client::OpaqueMap;
1110
use crate::memdx::client_response::ClientResponse;
1211
use crate::memdx::error::{CancellationErrorKind, ErrorKind};
13-
use crate::memdx::error::Error;
1412
use crate::memdx::error::Result;
1513
use crate::memdx::response::TryFromClientResponse;
1614

@@ -27,19 +25,19 @@ pub(crate) trait OpCanceller {
2725

2826
pub struct ClientPendingOp {
2927
opaque: u32,
30-
cancel_chan: CancellationSender,
3128
response_receiver: Receiver<Result<ClientResponse>>,
29+
opaque_map: Arc<Mutex<OpaqueMap>>,
3230
}
3331

3432
impl ClientPendingOp {
3533
pub(crate) fn new(
3634
opaque: u32,
37-
cancel_chan: CancellationSender,
35+
opaque_map: Arc<Mutex<OpaqueMap>>,
3836
response_receiver: Receiver<Result<ClientResponse>>,
3937
) -> Self {
4038
ClientPendingOp {
4139
opaque,
42-
cancel_chan,
40+
opaque_map,
4341
response_receiver,
4442
}
4543
}
@@ -51,13 +49,31 @@ impl ClientPendingOp {
5149
}
5250
}
5351

54-
pub fn cancel(&mut self, e: CancellationErrorKind) {
55-
match self.cancel_chan.send((self.opaque, e)) {
56-
Ok(_) => {}
57-
Err(e) => {
58-
debug!("Failed to send cancel to channel {}", e);
59-
}
60-
};
52+
pub async fn cancel(&mut self, e: CancellationErrorKind) {
53+
// match self.cancel_chan.send((self.opaque, e)) {
54+
// Ok(_) => {}
55+
// Err(e) => {
56+
// debug!("Failed to send cancel to channel {}", e);
57+
// }
58+
// };
59+
let requests: Arc<Mutex<OpaqueMap>> = Arc::clone(&self.opaque_map);
60+
let mut map = requests.lock().await;
61+
62+
let t = map.remove(&self.opaque);
63+
64+
if let Some(map_entry) = t {
65+
let sender = Arc::clone(&map_entry);
66+
drop(map);
67+
68+
sender
69+
.send(Err(ErrorKind::Cancelled(e).into()))
70+
.await
71+
.unwrap();
72+
} else {
73+
drop(map);
74+
}
75+
76+
drop(requests);
6177
}
6278
}
6379

0 commit comments

Comments
 (0)