Skip to content

Commit 56fc8c7

Browse files
committed
Rework memdx error model
Motivation ---------- The memdx error model does not currently satisfy what we need from it. We need to be able to attach cluster configs and error context to errors sometimes. Changes -------- Rework memdx error model to include more types of error and the top level error be a struct.
1 parent 4edc02d commit 56fc8c7

21 files changed

+383
-247
lines changed

sdk/couchbase-core/src/cbconfig/mod.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::collections::HashMap;
22

33
use serde::Deserialize;
44

5-
#[derive(Deserialize, Debug)]
5+
#[derive(Deserialize, Debug, Clone)]
66
pub struct VBucketServerMap {
77
#[serde(alias = "hashAlgorithm")]
88
pub hash_algorithm: String,
@@ -14,13 +14,13 @@ pub struct VBucketServerMap {
1414
pub vbucket_map: Vec<Vec<i16>>,
1515
}
1616

17-
#[derive(Deserialize, Debug)]
17+
#[derive(Deserialize, Debug, Clone)]
1818
pub struct ConfigDDocs {
1919
#[serde(alias = "uri")]
2020
pub uri: String,
2121
}
2222

23-
#[derive(Deserialize, Debug, Default)]
23+
#[derive(Deserialize, Debug, Default, Clone)]
2424
pub struct TerseExtNodePorts {
2525
#[serde(alias = "kv")]
2626
pub kv: Option<i64>,
@@ -61,23 +61,23 @@ pub struct TerseExtNodePorts {
6161
pub backup_ssl: Option<i64>,
6262
}
6363

64-
#[derive(Deserialize, Debug)]
64+
#[derive(Deserialize, Debug, Clone)]
6565
pub struct TerseExtNodeAltAddresses {
6666
#[serde(alias = "ports")]
6767
pub ports: TerseExtNodePorts,
6868
#[serde(alias = "hostname")]
6969
pub hostname: String,
7070
}
7171

72-
#[derive(Deserialize, Debug)]
72+
#[derive(Deserialize, Debug, Clone)]
7373
pub struct TerseNodePorts {
7474
#[serde(alias = "direct")]
7575
pub direct: Option<u16>,
7676
#[serde(alias = "proxy")]
7777
pub proxy: Option<u16>,
7878
}
7979

80-
#[derive(Deserialize, Debug)]
80+
#[derive(Deserialize, Debug, Clone)]
8181
pub struct TerseNodeConfig {
8282
#[serde(alias = "couchbaseApiBase")]
8383
pub couchbase_api_base: Option<String>,
@@ -87,7 +87,7 @@ pub struct TerseNodeConfig {
8787
pub ports: Option<TerseNodePorts>,
8888
}
8989

90-
#[derive(Deserialize, Debug)]
90+
#[derive(Deserialize, Debug, Clone)]
9191
pub struct TerseNodeExtConfig {
9292
#[serde(alias = "services")]
9393
pub services: Option<TerseExtNodePorts>,
@@ -99,7 +99,7 @@ pub struct TerseNodeExtConfig {
9999
pub alternate_addresses: HashMap<String, TerseExtNodeAltAddresses>,
100100
}
101101

102-
#[derive(Deserialize, Debug)]
102+
#[derive(Deserialize, Debug, Clone)]
103103
pub struct TerseConfig {
104104
#[serde(alias = "rev")]
105105
pub rev: i64,

sdk/couchbase-core/src/error.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
use std::fmt::Display;
22

33
use crate::error::CoreError::{Dispatch, Placeholder, PlaceholderMemdxWrapper};
4-
use crate::memdx::error::MemdxError;
4+
use crate::memdx::error::{Error, ErrorKind};
55

6-
#[derive(thiserror::Error, Debug, Eq, PartialEq)]
6+
#[derive(thiserror::Error, Debug)]
77
pub enum CoreError {
88
#[error("Dispatch error {0}")]
9-
Dispatch(MemdxError),
9+
Dispatch(Error),
1010
#[error("Placeholder error {0}")]
1111
Placeholder(String),
1212
#[error("Placeholder memdx wrapper error {0}")]
13-
PlaceholderMemdxWrapper(MemdxError),
13+
PlaceholderMemdxWrapper(Error),
1414
}
1515

16-
impl From<MemdxError> for CoreError {
17-
fn from(value: MemdxError) -> Self {
18-
match value {
19-
MemdxError::Dispatch(_) => Dispatch(value),
16+
impl From<Error> for CoreError {
17+
fn from(value: Error) -> Self {
18+
match value.kind.as_ref() {
19+
ErrorKind::Dispatch(_) => Dispatch(value),
2020
_ => PlaceholderMemdxWrapper(value),
2121
}
2222
}

sdk/couchbase-core/src/kvclient.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ where
176176
}
177177

178178
let (connection_close_tx, mut connection_close_rx) =
179-
oneshot::channel::<crate::memdx::client::MemdxResult<()>>();
179+
oneshot::channel::<crate::memdx::error::Result<()>>();
180180
let memdx_client_opts = DispatcherOptions {
181181
on_connection_close_handler: Some(connection_close_tx),
182182
orphan_handler: opts.orphan_handler,

sdk/couchbase-core/src/memdx/auth_mechanism.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::memdx::error::MemdxError;
1+
use crate::memdx::error::{Error, ErrorKind};
22

33
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
44
pub enum AuthMechanism {
@@ -22,7 +22,7 @@ impl From<AuthMechanism> for Vec<u8> {
2222
}
2323

2424
impl TryFrom<&str> for AuthMechanism {
25-
type Error = MemdxError;
25+
type Error = Error;
2626

2727
fn try_from(value: &str) -> Result<Self, Self::Error> {
2828
let mech = match value {
@@ -31,10 +31,10 @@ impl TryFrom<&str> for AuthMechanism {
3131
"SCRAM-SHA256" => AuthMechanism::ScramSha256,
3232
"SCRAM-SHA512" => AuthMechanism::ScramSha512,
3333
_ => {
34-
return Err(MemdxError::Protocol(format!(
35-
"Unknown auth mechanism {}",
36-
value
37-
)));
34+
return Err(ErrorKind::Protocol {
35+
msg: format!("Unknown auth mechanism {}", value),
36+
}
37+
.into());
3838
}
3939
};
4040

sdk/couchbase-core/src/memdx/client.rs

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@ use crate::memdx::client_response::ClientResponse;
2626
use crate::memdx::codec::KeyValueCodec;
2727
use crate::memdx::connection::{Connection, ConnectionType};
2828
use crate::memdx::dispatcher::{Dispatcher, DispatcherOptions};
29-
use crate::memdx::error::{CancellationErrorKind, MemdxError};
29+
use crate::memdx::error;
30+
use crate::memdx::error::{CancellationErrorKind, Error, ErrorKind};
3031
use crate::memdx::packet::{RequestPacket, ResponsePacket};
3132
use crate::memdx::pendingop::ClientPendingOp;
3233

33-
pub type MemdxResult<T> = std::result::Result<T, MemdxError>;
34-
type ResponseSender = Sender<MemdxResult<ClientResponse>>;
34+
type ResponseSender = Sender<error::Result<ClientResponse>>;
3535
type OpaqueMap = HashMap<u32, Arc<ResponseSender>>;
3636
pub(crate) type CancellationSender = UnboundedSender<(u32, CancellationErrorKind)>;
3737

@@ -41,7 +41,7 @@ struct ReadLoopOptions {
4141
pub local_addr: Option<SocketAddr>,
4242
pub peer_addr: Option<SocketAddr>,
4343
pub orphan_handler: Arc<UnboundedSender<ResponsePacket>>,
44-
pub on_connection_close_tx: Option<oneshot::Sender<MemdxResult<()>>>,
44+
pub on_connection_close_tx: Option<oneshot::Sender<error::Result<()>>>,
4545
pub on_client_close_rx: Receiver<()>,
4646
}
4747

@@ -90,7 +90,10 @@ impl Client {
9090
for entry in opaque_map.iter() {
9191
entry
9292
.1
93-
.send(Err(MemdxError::ClosedInFlight))
93+
.send(Err(ErrorKind::Cancelled(
94+
CancellationErrorKind::ClosedInFlight,
95+
)
96+
.into()))
9497
.await
9598
.unwrap_or_default();
9699
}
@@ -100,7 +103,7 @@ impl Client {
100103
stream: FramedRead<ReadHalf<TcpStream>, KeyValueCodec>,
101104
op_cancel_rx: UnboundedReceiver<(u32, CancellationErrorKind)>,
102105
opaque_map: MutexGuard<'_, OpaqueMap>,
103-
on_connection_close_tx: Option<oneshot::Sender<MemdxResult<()>>>,
106+
on_connection_close_tx: Option<oneshot::Sender<error::Result<()>>>,
104107
) {
105108
drop(stream);
106109
drop(op_cancel_rx);
@@ -138,7 +141,7 @@ impl Client {
138141
drop(map);
139142

140143
sender
141-
.send(Err(MemdxError::Cancelled(cancel_info.1)))
144+
.send(Err(ErrorKind::Cancelled(cancel_info.1).into()))
142145
.await
143146
.unwrap();
144147
} else {
@@ -300,7 +303,7 @@ impl Dispatcher for Client {
300303
}
301304
}
302305

303-
async fn dispatch(&self, mut packet: RequestPacket) -> MemdxResult<ClientPendingOp> {
306+
async fn dispatch(&self, mut packet: RequestPacket) -> error::Result<ClientPendingOp> {
304307
let (response_tx, response_rx) = mpsc::channel(1);
305308
let opaque = self.register_handler(Arc::new(response_tx)).await;
306309
packet.opaque = Some(opaque);
@@ -323,14 +326,14 @@ impl Dispatcher for Client {
323326
let mut map = requests.lock().await;
324327
map.remove(&opaque);
325328

326-
Err(MemdxError::Dispatch(e.kind()))
329+
Err(ErrorKind::Dispatch(Arc::new(e)).into())
327330
}
328331
}
329332
}
330333

331-
async fn close(&self) -> MemdxResult<()> {
334+
async fn close(&self) -> error::Result<()> {
332335
if self.closed.swap(true, Ordering::SeqCst) {
333-
return Err(MemdxError::Closed);
336+
return Err(ErrorKind::Closed.into());
334337
}
335338

336339
let mut close_err = None;
@@ -354,7 +357,7 @@ impl Dispatcher for Client {
354357
Self::drain_opaque_map(map).await;
355358

356359
if let Some(e) = close_err {
357-
return Err(MemdxError::from(e));
360+
return Err(ErrorKind::Io(Arc::new(e)).into());
358361
}
359362

360363
Ok(())
@@ -405,7 +408,7 @@ mod tests {
405408
.expect("Could not connect");
406409

407410
let (orphan_tx, mut orphan_rx) = unbounded_channel::<ResponsePacket>();
408-
let (close_tx, mut close_rx) = oneshot::channel::<crate::memdx::client::MemdxResult<()>>();
411+
let (close_tx, mut close_rx) = oneshot::channel::<crate::memdx::error::Result<()>>();
409412

410413
tokio::spawn(async move {
411414
loop {

sdk/couchbase-core/src/memdx/codec.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ use std::io;
33
use tokio_util::bytes::{Buf, BufMut, BytesMut};
44
use tokio_util::codec::{Decoder, Encoder};
55

6-
use crate::memdx::error::MemdxError;
7-
use crate::memdx::error::MemdxError::Protocol;
6+
use crate::memdx::error::{Error, ErrorKind};
87
use crate::memdx::magic::Magic;
98
use crate::memdx::opcode::OpCode;
109
use crate::memdx::packet::{RequestPacket, ResponsePacket};
@@ -17,7 +16,7 @@ pub struct KeyValueCodec(());
1716

1817
impl Decoder for KeyValueCodec {
1918
type Item = ResponsePacket;
20-
type Error = MemdxError;
19+
type Error = Error;
2120

2221
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
2322
let buf_len = buf.len();
@@ -28,7 +27,11 @@ impl Decoder for KeyValueCodec {
2827

2928
let total_body_len = match buf[8..12].try_into() {
3029
Ok(v) => u32::from_be_bytes(v),
31-
Err(e) => return Err(Protocol(e.to_string())),
30+
Err(e) => {
31+
return Err(Error {
32+
kind: ErrorKind::Protocol { msg: e.to_string() }.into(),
33+
})
34+
}
3235
} as usize;
3336

3437
if buf_len < (HEADER_SIZE + total_body_len) {

sdk/couchbase-core/src/memdx/connection.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ use tokio_rustls::rustls::client::danger::{
1212
use tokio_rustls::rustls::pki_types::{CertificateDer, IpAddr, ServerName, UnixTime};
1313
use tokio_rustls::TlsConnector;
1414

15-
use crate::memdx::client::MemdxResult;
16-
use crate::memdx::error::MemdxError;
15+
use crate::memdx::error::ErrorKind;
16+
use crate::memdx::error::Result;
1717

1818
#[derive(Debug, Default)]
1919
pub struct TlsConfig {
@@ -42,7 +42,7 @@ pub struct Connection {
4242
}
4343

4444
impl Connection {
45-
pub async fn connect(addr: SocketAddr, opts: ConnectOptions) -> MemdxResult<Connection> {
45+
pub async fn connect(addr: SocketAddr, opts: ConnectOptions) -> Result<Connection> {
4646
let remote_addr = addr.to_string();
4747

4848
if let Some(tls_config) = opts.tls_config {
@@ -55,19 +55,19 @@ impl Connection {
5555
} else if let Some(roots) = tls_config.root_certs {
5656
builder.with_root_certificates(roots).with_no_client_auth()
5757
} else {
58-
return Err(MemdxError::Generic(
59-
"If tls config is specified then roots or accept_all_certs must be specified"
58+
return Err(ErrorKind::InvalidArgument {
59+
msg: "If tls config is specified then roots or accept_all_certs must be specified"
6060
.to_string(),
61-
));
61+
}.into());
6262
};
6363

6464
let tcp_socket = timeout_at(opts.deadline, TcpStream::connect(remote_addr))
6565
.await?
66-
.map_err(|e| MemdxError::Connect(e.kind()))?;
66+
.map_err(|e| ErrorKind::Connect(Arc::new(e)))?;
6767

6868
tcp_socket
6969
.set_nodelay(false)
70-
.map_err(|e| MemdxError::Connect(e.kind()))?;
70+
.map_err(|e| ErrorKind::Connect(Arc::new(e)))?;
7171

7272
let local_addr = match tcp_socket.local_addr() {
7373
Ok(addr) => Some(addr),
@@ -84,7 +84,7 @@ impl Connection {
8484
connector.connect(ServerName::IpAddress(IpAddr::from(addr.ip())), tcp_socket),
8585
)
8686
.await?
87-
.map_err(|e| MemdxError::Connect(e.kind()))?;
87+
.map_err(|e| ErrorKind::Connect(Arc::new(e)))?;
8888

8989
Ok(Connection {
9090
inner: ConnectionType::Tls(socket),
@@ -94,10 +94,10 @@ impl Connection {
9494
} else {
9595
let socket = timeout_at(opts.deadline, TcpStream::connect(remote_addr))
9696
.await?
97-
.map_err(|e| MemdxError::Connect(e.kind()))?;
97+
.map_err(|e| ErrorKind::Connect(Arc::new(e)))?;
9898
socket
9999
.set_nodelay(false)
100-
.map_err(|e| MemdxError::Connect(e.kind()))?;
100+
.map_err(|e| ErrorKind::Connect(Arc::new(e)))?;
101101

102102
let local_addr = match socket.local_addr() {
103103
Ok(addr) => Some(addr),

sdk/couchbase-core/src/memdx/dispatcher.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,20 @@ use async_trait::async_trait;
44
use tokio::sync::mpsc::UnboundedSender;
55
use tokio::sync::oneshot;
66

7-
use crate::memdx::client::MemdxResult;
87
use crate::memdx::connection::Connection;
8+
use crate::memdx::error::Result;
99
use crate::memdx::packet::{RequestPacket, ResponsePacket};
1010
use crate::memdx::pendingop::ClientPendingOp;
1111

1212
#[derive(Debug)]
1313
pub struct DispatcherOptions {
1414
pub orphan_handler: Arc<UnboundedSender<ResponsePacket>>,
15-
pub on_connection_close_handler: Option<oneshot::Sender<MemdxResult<()>>>,
15+
pub on_connection_close_handler: Option<oneshot::Sender<Result<()>>>,
1616
}
1717

1818
#[async_trait]
1919
pub trait Dispatcher: Send + Sync {
2020
fn new(conn: Connection, opts: DispatcherOptions) -> Self;
21-
async fn dispatch(&self, packet: RequestPacket) -> MemdxResult<ClientPendingOp>;
22-
async fn close(&self) -> MemdxResult<()>;
21+
async fn dispatch(&self, packet: RequestPacket) -> Result<ClientPendingOp>;
22+
async fn close(&self) -> Result<()>;
2323
}

0 commit comments

Comments
 (0)