From 0b1d5dd019f404b01a00945a926dc22dd5de965c Mon Sep 17 00:00:00 2001 From: Charles Dixon Date: Wed, 24 Jul 2024 10:10:54 +0100 Subject: [PATCH] RSCBC-9: Implement memd routing --- sdk/couchbase-core/src/authenticator.rs | 11 +- sdk/couchbase-core/src/configparser.rs | 9 +- sdk/couchbase-core/src/configwatcher.rs | 10 +- sdk/couchbase-core/src/error.rs | 107 +++++- sdk/couchbase-core/src/kvclient.rs | 39 +- sdk/couchbase-core/src/kvclient_ops.rs | 27 +- sdk/couchbase-core/src/kvclientmanager.rs | 166 +++++---- sdk/couchbase-core/src/kvclientpool.rs | 39 +- sdk/couchbase-core/src/lib.rs | 2 +- sdk/couchbase-core/src/memdx/error.rs | 145 ++++++-- .../src/memdx/op_auth_saslauto.rs | 10 +- sdk/couchbase-core/src/memdx/ops_core.rs | 41 ++- sdk/couchbase-core/src/memdx/ops_crud.rs | 27 +- sdk/couchbase-core/src/memdx/response.rs | 96 ++++- sdk/couchbase-core/src/memdx/status.rs | 23 +- sdk/couchbase-core/src/result.rs | 1 - sdk/couchbase-core/src/vbucketmap.rs | 60 ++- sdk/couchbase-core/src/vbucketrouter.rs | 347 ++++++++++++++++++ 18 files changed, 887 insertions(+), 273 deletions(-) delete mode 100644 sdk/couchbase-core/src/result.rs create mode 100644 sdk/couchbase-core/src/vbucketrouter.rs diff --git a/sdk/couchbase-core/src/authenticator.rs b/sdk/couchbase-core/src/authenticator.rs index 65caa551..34e92437 100644 --- a/sdk/couchbase-core/src/authenticator.rs +++ b/sdk/couchbase-core/src/authenticator.rs @@ -1,16 +1,13 @@ use std::fmt::Debug; -use crate::result::CoreResult; +use crate::error::Result; use crate::service_type::ServiceType; pub trait Authenticator: Debug + Send + Sync { // TODO: get_client_certificate needs some thought about how to expose the certificate // fn get_client_certificate(service: ServiceType, host_port: String) -> - fn get_credentials( - &self, - service_type: ServiceType, - host_port: String, - ) -> CoreResult; + fn get_credentials(&self, service_type: ServiceType, host_port: String) + -> Result; } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -30,7 +27,7 @@ impl Authenticator for PasswordAuthenticator { &self, _service_type: ServiceType, _host_port: String, - ) -> CoreResult { + ) -> Result { Ok(UserPassPair { username: self.username.clone(), password: self.password.clone(), diff --git a/sdk/couchbase-core/src/configparser.rs b/sdk/couchbase-core/src/configparser.rs index 0f82a576..ca9e5f8e 100644 --- a/sdk/couchbase-core/src/configparser.rs +++ b/sdk/couchbase-core/src/configparser.rs @@ -1,20 +1,17 @@ use std::collections::HashMap; use crate::cbconfig::{TerseConfig, TerseExtNodePorts, VBucketServerMap}; +use crate::error::Result; use crate::parsedconfig::{ BucketType, ParsedConfig, ParsedConfigBucket, ParsedConfigFeatures, ParsedConfigNode, ParsedConfigNodeAddresses, ParsedConfigNodePorts, }; -use crate::result::CoreResult; use crate::vbucketmap::VbucketMap; pub(crate) struct ConfigParser {} impl ConfigParser { - pub fn parse_terse_config( - config: TerseConfig, - source_hostname: &str, - ) -> CoreResult { + pub fn parse_terse_config(config: TerseConfig, source_hostname: &str) -> Result { let rev_id = config.rev; let rev_epoch = config.rev_epoch.unwrap_or_default(); @@ -119,7 +116,7 @@ impl ConfigParser { fn parse_vbucket_server_map( vbucket_server_map: Option, - ) -> CoreResult> { + ) -> Result> { if let Some(vbucket_server_map) = vbucket_server_map { if vbucket_server_map.vbucket_map.is_empty() { return Ok(None); diff --git a/sdk/couchbase-core/src/configwatcher.rs b/sdk/couchbase-core/src/configwatcher.rs index 71a2377c..265b6866 100644 --- a/sdk/couchbase-core/src/configwatcher.rs +++ b/sdk/couchbase-core/src/configwatcher.rs @@ -10,16 +10,16 @@ use tokio::time::sleep; use crate::cbconfig::TerseConfig; use crate::configparser::ConfigParser; +use crate::error::Result; use crate::kvclient::KvClient; use crate::kvclient_ops::KvClientOps; use crate::kvclientmanager::KvClientManager; use crate::memdx::request::GetClusterConfigRequest; use crate::parsedconfig::ParsedConfig; -use crate::result::CoreResult; pub(crate) trait ConfigWatcher { fn watch(&self, on_shutdown_rx: Receiver<()>) -> Receiver; - fn reconfigure(&self, config: ConfigWatcherMemdConfig) -> impl Future>; + fn reconfigure(&self, config: ConfigWatcherMemdConfig) -> impl Future>; } pub(crate) struct ConfigWatcherMemdConfig { @@ -41,7 +41,7 @@ impl ConfigWatcherMemdInner where M: KvClientManager, { - pub async fn reconfigure(&self, config: ConfigWatcherMemdConfig) -> CoreResult<()> { + pub async fn reconfigure(&self, config: ConfigWatcherMemdConfig) -> Result<()> { let mut endpoints = self.endpoints.lock().unwrap(); *endpoints = config.endpoints; @@ -139,7 +139,7 @@ where } } - async fn poll_one(&self, endpoint: String) -> CoreResult { + async fn poll_one(&self, endpoint: String) -> Result { let client = self.kv_client_manager.get_client(endpoint).await?; let addr = client.remote_addr(); @@ -189,7 +189,7 @@ where on_new_config_rx } - async fn reconfigure(&self, config: ConfigWatcherMemdConfig) -> CoreResult<()> { + async fn reconfigure(&self, config: ConfigWatcherMemdConfig) -> Result<()> { self.inner.reconfigure(config).await } } diff --git a/sdk/couchbase-core/src/error.rs b/sdk/couchbase-core/src/error.rs index ba932bae..407ce77d 100644 --- a/sdk/couchbase-core/src/error.rs +++ b/sdk/couchbase-core/src/error.rs @@ -1,29 +1,100 @@ use std::fmt::Display; -use crate::error::CoreError::{Dispatch, Placeholder, PlaceholderMemdxWrapper}; -use crate::memdx::error::{Error, ErrorKind}; - -#[derive(thiserror::Error, Debug)] -pub enum CoreError { - #[error("Dispatch error {0}")] - Dispatch(Error), - #[error("Placeholder error {0}")] - Placeholder(String), - #[error("Placeholder memdx wrapper error {0}")] - PlaceholderMemdxWrapper(Error), +use crate::memdx::error::Error as MemdxError; + +pub type Result = std::result::Result; + +#[derive(thiserror::Error, Debug, Clone)] +#[error("{kind}")] +#[non_exhaustive] +pub struct Error { + pub kind: Box, + // #[source] + // pub(crate) source: Option>, } -impl From for CoreError { - fn from(value: Error) -> Self { - match value.kind.as_ref() { - ErrorKind::Dispatch(_) => Dispatch(value), - _ => PlaceholderMemdxWrapper(value), +impl Error { + pub(crate) fn new(kind: ErrorKind) -> Self { + Self { + kind: Box::new(kind), + // source: None, + } + } + + // pub(crate) fn with_source(kind: ErrorKind, source: impl std::error::Error + 'static) -> Self { + // Self { + // kind: Box::new(kind), + // source: Some(Box::new(source)), + // } + // } + + pub fn is_memdx_error(&self) -> Option<&MemdxError> { + match self.kind.as_ref() { + ErrorKind::MemdxError(e) => Some(e), + _ => None, } } + + pub(crate) fn new_invalid_arguments_error(msg: &str) -> Self { + Self::new(ErrorKind::InvalidArgument { + msg: msg.to_string(), + }) + } + + pub(crate) fn new_internal_error(msg: &str) -> Self { + Self::new(ErrorKind::Internal { + msg: msg.to_string(), + }) + } +} + +#[derive(thiserror::Error, Debug, Clone)] +#[non_exhaustive] +pub enum ErrorKind { + #[error("Vbucket map outdated")] + VbucketMapOutdated, + #[error("An error occurred during serialization/deserialization {msg}")] + #[non_exhaustive] + JSONError { msg: String }, + #[error("Invalid argument {msg}")] + #[non_exhaustive] + InvalidArgument { msg: String }, + #[error("{0}")] + MemdxError(MemdxError), + #[error("Endpoint not known {endpoint}")] + #[non_exhaustive] + EndpointNotKnown { endpoint: String }, + #[error("no endpoints available")] + #[non_exhaustive] + NoEndpointsAvailable, + #[error("Shutdown")] + Shutdown, + #[error("Internal error {msg}")] + #[non_exhaustive] + Internal { msg: String }, +} + +impl From for Error +where + ErrorKind: From, +{ + fn from(err: E) -> Self { + Self { + kind: Box::new(err.into()), + } + } +} + +impl From for Error { + fn from(value: MemdxError) -> Self { + Self::new(ErrorKind::MemdxError(value)) + } } -impl From for CoreError { +impl From for Error { fn from(value: serde_json::Error) -> Self { - Placeholder(value.to_string()) + Self::new(ErrorKind::JSONError { + msg: value.to_string(), + }) } } diff --git a/sdk/couchbase-core/src/kvclient.rs b/sdk/couchbase-core/src/kvclient.rs index 4662d497..d3f5d80d 100644 --- a/sdk/couchbase-core/src/kvclient.rs +++ b/sdk/couchbase-core/src/kvclient.rs @@ -12,7 +12,8 @@ use tokio_rustls::rustls::RootCertStore; use uuid::Uuid; use crate::authenticator::Authenticator; -use crate::error::CoreError; +use crate::error::Error; +use crate::error::Result; use crate::memdx::auth_mechanism::AuthMechanism; use crate::memdx::connection::{Connection, ConnectOptions}; use crate::memdx::dispatcher::{Dispatcher, DispatcherOptions}; @@ -21,7 +22,6 @@ use crate::memdx::op_auth_saslauto::SASLAuthAutoOptions; use crate::memdx::op_bootstrap::BootstrapOptions; use crate::memdx::packet::ResponsePacket; use crate::memdx::request::{GetErrorMapRequest, HelloRequest, SelectBucketRequest}; -use crate::result::CoreResult; use crate::service_type::ServiceType; #[derive(Debug, Clone)] @@ -62,13 +62,13 @@ pub(crate) trait KvClient: Sized + PartialEq + Send + Sync { fn new( config: KvClientConfig, opts: KvClientOptions, - ) -> impl Future> + Send; - fn reconfigure(&self, config: KvClientConfig) -> impl Future> + Send; + ) -> impl Future> + Send; + fn reconfigure(&self, config: KvClientConfig) -> impl Future> + Send; fn has_feature(&self, feature: HelloFeature) -> bool; fn load_factor(&self) -> f64; fn remote_addr(&self) -> SocketAddr; fn local_addr(&self) -> Option; - fn close(&self) -> impl Future> + Send; + fn close(&self) -> impl Future> + Send; fn id(&self) -> &str; } @@ -89,8 +89,6 @@ pub(crate) struct StdKvClient { // asynchronously and we do not support changing selected buckets. selected_bucket: Mutex>, - closed: Arc, - id: String, } @@ -107,7 +105,7 @@ impl KvClient for StdKvClient where D: Dispatcher, { - async fn new(config: KvClientConfig, opts: KvClientOptions) -> CoreResult> { + async fn new(config: KvClientConfig, opts: KvClientOptions) -> Result> { let requested_features = if config.disable_default_features { vec![] } else { @@ -170,8 +168,8 @@ where if should_bootstrap && config.disable_bootstrap { // TODO: error model needs thought. - return Err(CoreError::Placeholder( - "Bootstrap was disabled but options requiring bootstrap were specified".to_string(), + return Err(Error::new_invalid_arguments_error( + "Bootstrap was disabled but options requiring bootstrap were specified", )); } @@ -212,7 +210,6 @@ where current_config: Mutex::new(config), supported_features: vec![], selected_bucket: Mutex::new(None), - closed, id: id.clone(), }; @@ -260,7 +257,7 @@ where Ok(kv_cli) } - async fn reconfigure(&self, config: KvClientConfig) -> CoreResult<()> { + async fn reconfigure(&self, config: KvClientConfig) -> Result<()> { let mut current_config = self.current_config.lock().await; // TODO: compare root certs or something somehow. @@ -271,15 +268,15 @@ where && current_config.disable_error_map == config.disable_error_map && current_config.disable_bootstrap == config.disable_bootstrap) { - return Err(CoreError::Placeholder( - "Cannot reconfigure due to conflicting options".to_string(), + return Err(Error::new_invalid_arguments_error( + "Cannot reconfigure due to conflicting options", )); } let selected_bucket_name = if current_config.selected_bucket != config.selected_bucket { if current_config.selected_bucket.is_some() { - return Err(CoreError::Placeholder( - "Cannot reconfigure from one selected bucket to another".to_string(), + return Err(Error::new_invalid_arguments_error( + "Cannot reconfigure from one selected bucket to another", )); } @@ -292,8 +289,8 @@ where }; if *current_config.deref() != config { - return Err(CoreError::Placeholder( - "Client config after reconfigure did not match new configuration".to_string(), + return Err(Error::new_invalid_arguments_error( + "Client config after reconfigure did not match new configuration", )); } @@ -336,11 +333,7 @@ where self.local_addr } - async fn close(&self) -> CoreResult<()> { - if self.closed.swap(true, Ordering::Relaxed) { - return Err(CoreError::Placeholder("Client closed".to_string())); - } - + async fn close(&self) -> Result<()> { Ok(self.cli.close().await?) } diff --git a/sdk/couchbase-core/src/kvclient_ops.rs b/sdk/couchbase-core/src/kvclient_ops.rs index 94e1c872..dba2b806 100644 --- a/sdk/couchbase-core/src/kvclient_ops.rs +++ b/sdk/couchbase-core/src/kvclient_ops.rs @@ -1,6 +1,7 @@ use std::future::Future; -use crate::error::CoreError; +use crate::error::Error; +use crate::error::Result; use crate::kvclient::{KvClient, StdKvClient}; use crate::memdx::dispatcher::Dispatcher; use crate::memdx::hello_feature::HelloFeature; @@ -12,29 +13,28 @@ use crate::memdx::request::{GetClusterConfigRequest, GetRequest, SelectBucketReq use crate::memdx::response::{ BootstrapResult, GetClusterConfigResponse, GetResponse, SelectBucketResponse, SetResponse, }; -use crate::result::CoreResult; pub(crate) trait KvClientOps: Sized + Send + Sync { - fn set(&self, req: SetRequest) -> impl Future> + Send; - fn get(&self, req: GetRequest) -> impl Future> + Send; + fn set(&self, req: SetRequest) -> impl Future> + Send; + fn get(&self, req: GetRequest) -> impl Future> + Send; fn get_cluster_config( &self, req: GetClusterConfigRequest, - ) -> impl Future> + Send; + ) -> impl Future> + Send; } impl KvClientOps for StdKvClient where D: Dispatcher, { - async fn set(&self, req: SetRequest) -> CoreResult { + async fn set(&self, req: SetRequest) -> Result { let mut op = self.ops_crud().set(self.client(), req).await?; let res = op.recv().await?; Ok(res) } - async fn get(&self, req: GetRequest) -> CoreResult { + async fn get(&self, req: GetRequest) -> Result { let mut op = self.ops_crud().get(self.client(), req).await?; let res = op.recv().await?; @@ -44,7 +44,7 @@ where async fn get_cluster_config( &self, req: GetClusterConfigRequest, - ) -> CoreResult { + ) -> Result { let mut op = OpsCore {}.get_cluster_config(self.client(), req).await?; let res = op.recv().await?; @@ -56,20 +56,17 @@ impl StdKvClient where D: Dispatcher, { - pub async fn bootstrap(&self, opts: BootstrapOptions) -> CoreResult { + pub async fn bootstrap(&self, opts: BootstrapOptions) -> Result { OpBootstrap::bootstrap(OpsCore {}, self.client(), opts) .await - .map_err(CoreError::from) + .map_err(Error::from) } - pub async fn select_bucket( - &self, - req: SelectBucketRequest, - ) -> CoreResult { + pub async fn select_bucket(&self, req: SelectBucketRequest) -> Result { let mut op = OpsCore {} .select_bucket(self.client(), req) .await - .map_err(CoreError::from)?; + .map_err(Error::from)?; let res = op.recv().await?; Ok(res) diff --git a/sdk/couchbase-core/src/kvclientmanager.rs b/sdk/couchbase-core/src/kvclientmanager.rs index 5197df1e..ab06acba 100644 --- a/sdk/couchbase-core/src/kvclientmanager.rs +++ b/sdk/couchbase-core/src/kvclientmanager.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::error::Error; use std::future::Future; use std::sync::Arc; use std::time::Duration; @@ -6,11 +7,13 @@ use std::time::Duration; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::Mutex; -use crate::error::CoreError; +use crate::error::ErrorKind; +use crate::error::Result; use crate::kvclient::{KvClient, KvClientConfig}; +use crate::kvclient_ops::KvClientOps; use crate::kvclientpool::{KvClientPool, KvClientPoolConfig, KvClientPoolOptions}; use crate::memdx::packet::ResponsePacket; -use crate::result::CoreResult; +use crate::memdx::response::TryFromClientResponse; pub(crate) trait KvClientManager: Sized + Send + Sync { type Pool: KvClientPool + Send + Sync; @@ -18,35 +21,24 @@ pub(crate) trait KvClientManager: Sized + Send + Sync { fn new( config: KvClientManagerConfig, opts: KvClientManagerOptions, - ) -> impl Future> + Send; - fn reconfigure( - &self, - config: KvClientManagerConfig, - ) -> impl Future> + Send; + ) -> impl Future> + Send; + fn reconfigure(&self, config: KvClientManagerConfig) + -> impl Future> + Send; fn get_client( &self, endpoint: String, - ) -> impl Future< - Output = CoreResult::Pool as KvClientPool>::Client>>, - > + Send; + ) -> impl Future::Pool as KvClientPool>::Client>>> + + Send; fn get_random_client( &self, - ) -> impl Future< - Output = CoreResult::Pool as KvClientPool>::Client>>, - > + Send; + ) -> impl Future::Pool as KvClientPool>::Client>>> + + Send; fn shutdown_client( &self, endpoint: String, client: Arc<<::Pool as KvClientPool>::Client>, - ) -> impl Future> + Send; - fn close(&self) -> impl Future> + Send; - fn orchestrate_operation( - &self, - endpoint: String, - operation: impl Fn(Arc<<::Pool as KvClientPool>::Client>) -> Fut, - ) -> impl Future> - where - Fut: Future> + Send; + ) -> impl Future> + Send; + fn close(&self) -> impl Future> + Send; } #[derive(Debug)] @@ -91,20 +83,20 @@ impl

StdKvClientManager

where P: KvClientPool, { - async fn get_pool(&self, endpoint: String) -> CoreResult> { + async fn get_pool(&self, endpoint: String) -> Result> { let state = self.state.lock().await; let pool = match state.client_pools.get(&endpoint) { Some(p) => p, None => { - return Err(CoreError::Placeholder("Endpoint not known".to_string())); + return Err(ErrorKind::EndpointNotKnown { endpoint }.into()); } }; Ok(pool.pool.clone()) } - async fn get_random_pool(&self) -> CoreResult> { + async fn get_random_pool(&self) -> Result> { let state = self.state.lock().await; // Just pick one at random for now @@ -112,7 +104,7 @@ where return Ok(pool.pool.clone()); } - Err(CoreError::Placeholder("Endpoint not known".to_string())) + Err(ErrorKind::NoEndpointsAvailable.into()) } async fn create_pool(&self, pool_config: KvClientPoolConfig) -> KvClientManagerPool

{ @@ -139,7 +131,7 @@ where { type Pool = P; - async fn new(config: KvClientManagerConfig, opts: KvClientManagerOptions) -> CoreResult { + async fn new(config: KvClientManagerConfig, opts: KvClientManagerOptions) -> Result { let manager = Self { state: Mutex::new(KvClientManagerState { client_pools: Default::default(), @@ -151,7 +143,7 @@ where Ok(manager) } - async fn reconfigure(&self, config: KvClientManagerConfig) -> CoreResult<()> { + async fn reconfigure(&self, config: KvClientManagerConfig) -> Result<()> { let mut guard = self.state.lock().await; let mut old_pools = std::mem::take(&mut guard.client_pools); @@ -194,7 +186,7 @@ where async fn get_client( &self, endpoint: String, - ) -> CoreResult::Pool as KvClientPool>::Client>> { + ) -> Result::Pool as KvClientPool>::Client>> { let pool = self.get_pool(endpoint).await?; pool.get_client().await @@ -202,7 +194,7 @@ where async fn get_random_client( &self, - ) -> CoreResult::Pool as KvClientPool>::Client>> { + ) -> Result::Pool as KvClientPool>::Client>> { let pool = self.get_random_pool().await?; pool.get_client().await @@ -212,7 +204,7 @@ where &self, endpoint: String, client: Arc<<::Pool as KvClientPool>::Client>, - ) -> CoreResult<()> { + ) -> Result<()> { let pool = self.get_pool(endpoint).await?; pool.shutdown_client(client).await; @@ -220,7 +212,7 @@ where Ok(()) } - async fn close(&self) -> CoreResult<()> { + async fn close(&self) -> Result<()> { let mut guard = self.state.lock().await; let mut old_pools = std::mem::take(&mut guard.client_pools); @@ -232,41 +224,56 @@ where Ok(()) } +} - async fn orchestrate_operation( - &self, - endpoint: String, - operation: impl Fn(Arc) -> Fut, - ) -> CoreResult - where - Fut: Future> + Send, - { - loop { - let client = self.get_client(endpoint.clone()).await?; +pub(crate) async fn orchestrate_memd_client( + manager: &M, + endpoint: String, + mut operation: impl FnMut(Arc<<::Pool as KvClientPool>::Client>) -> Fut, +) -> Result +where + M: KvClientManager, + Fut: Future> + Send, +{ + loop { + let client = manager.get_client(endpoint.clone()).await?; - let res = operation(client.clone()).await; - match res { - Ok(r) => { - return Ok(r); - } - Err(e) => match e { - CoreError::Dispatch(_) => { + let res = operation(client.clone()).await; + match res { + Ok(r) => { + return Ok(r); + } + Err(e) => { + if let Some(memdx_err) = e.is_memdx_error() { + if memdx_err.is_dispatch_error() { // This was a dispatch error, so we can just try with // a different client instead... // TODO: Log something - self.shutdown_client(endpoint.clone(), client) + manager + .shutdown_client(endpoint.clone(), client) .await .unwrap_or_default(); + continue; } - _ => { - return Err(e); - } - }, + } + + return Err(e); } } } } +pub(crate) struct OrchestrateMemdClientAsyncFnMut {} + +impl OrchestrateMemdClientAsyncFnMut { + async fn call(&mut self, client: Arc) -> Result + where + K: KvClient + KvClientOps + PartialEq + Sync + Send + 'static, + { + todo!() + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; @@ -281,7 +288,8 @@ mod tests { use crate::kvclient::{KvClient, KvClientConfig, StdKvClient}; use crate::kvclient_ops::KvClientOps; use crate::kvclientmanager::{ - KvClientManager, KvClientManagerConfig, KvClientManagerOptions, StdKvClientManager, + KvClientManager, KvClientManagerConfig, KvClientManagerOptions, orchestrate_memd_client, + StdKvClientManager, }; use crate::kvclientpool::{KvClientPool, NaiveKvClientPool}; use crate::memdx::client::Client; @@ -346,30 +354,30 @@ mod tests { .await .unwrap(); - let result = manager - .orchestrate_operation( - "192.168.107.128:11210".to_string(), - |client: Arc>| async move { - client - .set(SetRequest { - collection_id: 0, - key: "test".as_bytes().into(), - vbucket_id: 1, - flags: 0, - value: "test".as_bytes().into(), - datatype: 0, - expiry: None, - preserve_expiry: None, - cas: None, - on_behalf_of: None, - durability_level: None, - durability_level_timeout: None, - }) - .await - }, - ) - .await - .unwrap(); + let result = orchestrate_memd_client( + &manager, + "192.168.107.128:11210".to_string(), + |client: Arc>| async move { + client + .set(SetRequest { + collection_id: 0, + key: "test".as_bytes().into(), + vbucket_id: 1, + flags: 0, + value: "test".as_bytes().into(), + datatype: 0, + expiry: None, + preserve_expiry: None, + cas: None, + on_behalf_of: None, + durability_level: None, + durability_level_timeout: None, + }) + .await + }, + ) + .await + .unwrap(); dbg!(result); diff --git a/sdk/couchbase-core/src/kvclientpool.rs b/sdk/couchbase-core/src/kvclientpool.rs index 7b9b502d..5a96395b 100644 --- a/sdk/couchbase-core/src/kvclientpool.rs +++ b/sdk/couchbase-core/src/kvclientpool.rs @@ -8,12 +8,12 @@ use tokio::sync::{mpsc, Mutex}; use tokio::sync::mpsc::UnboundedSender; use tokio::time::{Instant, sleep}; -use crate::error::CoreError; +use crate::error::{Error, ErrorKind}; +use crate::error::Result; use crate::kvclient::{KvClient, KvClientConfig, KvClientOptions}; use crate::kvclient_ops::KvClientOps; use crate::memdx::dispatcher::Dispatcher; use crate::memdx::packet::ResponsePacket; -use crate::result::CoreResult; pub(crate) trait KvClientPool: Sized + Send + Sync { type Client: KvClient + KvClientOps + Send + Sync; @@ -22,13 +22,10 @@ pub(crate) trait KvClientPool: Sized + Send + Sync { config: KvClientPoolConfig, opts: KvClientPoolOptions, ) -> impl Future + Send; - fn get_client(&self) -> impl Future>> + Send; + fn get_client(&self) -> impl Future>> + Send; fn shutdown_client(&self, client: Arc) -> impl Future + Send; - fn close(&self) -> impl Future> + Send; - fn reconfigure( - &self, - config: KvClientPoolConfig, - ) -> impl Future> + Send; + fn close(&self) -> impl Future> + Send; + fn reconfigure(&self, config: KvClientPoolConfig) -> impl Future> + Send; } #[derive(Debug, Clone)] @@ -56,7 +53,7 @@ where client_idx: usize, - connect_error: Option, + connect_error: Option, connect_error_time: Option, orphan_handler: Arc>, @@ -135,7 +132,7 @@ where } } - async fn start_new_client(&mut self) -> CoreResult { + async fn start_new_client(&mut self) -> Result { loop { if let Some(error_time) = self.connect_error_time { let connect_wait_period = @@ -164,15 +161,15 @@ where client.close().await.unwrap_or_default(); } - return Err(CoreError::Placeholder("Closed".to_string())); + return Err(ErrorKind::Shutdown.into()); } client_result } - async fn get_client_slow(&mut self) -> CoreResult> { + async fn get_client_slow(&mut self) -> Result> { if self.closed.load(Ordering::SeqCst) { - return Err(CoreError::Placeholder("Closed".to_string())); + return Err(ErrorKind::Shutdown.into()); } if !self.clients.is_empty() { @@ -184,14 +181,14 @@ where } if let Some(e) = &self.connect_error { - return Err(CoreError::Placeholder(e.to_string())); + return Err(e.clone()); } self.check_connections().await; Box::pin(self.get_client_slow()).await } - pub async fn get_client(&mut self) -> CoreResult> { + pub async fn get_client(&mut self) -> Result> { if !self.clients.is_empty() { let idx = self.client_idx; self.client_idx += 1; @@ -227,9 +224,9 @@ where self.check_connections().await; } - pub async fn close(&mut self) -> CoreResult<()> { + pub async fn close(&mut self) -> Result<()> { if self.closed.swap(true, Ordering::SeqCst) { - return Err(CoreError::Placeholder("Closed".to_string())); + return Err(ErrorKind::Shutdown.into()); } for mut client in &self.clients { @@ -240,7 +237,7 @@ where Ok(()) } - pub async fn reconfigure(&mut self, config: KvClientPoolConfig) -> CoreResult<()> { + pub async fn reconfigure(&mut self, config: KvClientPoolConfig) -> Result<()> { let mut old_clients = self.clients.clone(); let mut new_clients = vec![]; for client in old_clients { @@ -290,7 +287,7 @@ where NaiveKvClientPool { inner: clients } } - async fn get_client(&self) -> CoreResult> { + async fn get_client(&self) -> Result> { let mut clients = self.inner.lock().await; clients.get_client().await @@ -302,12 +299,12 @@ where clients.shutdown_client(client).await; } - async fn close(&self) -> CoreResult<()> { + async fn close(&self) -> Result<()> { let mut inner = self.inner.lock().await; inner.close().await } - async fn reconfigure(&self, config: KvClientPoolConfig) -> CoreResult<()> { + async fn reconfigure(&self, config: KvClientPoolConfig) -> Result<()> { let mut inner = self.inner.lock().await; inner.reconfigure(config).await } diff --git a/sdk/couchbase-core/src/lib.rs b/sdk/couchbase-core/src/lib.rs index 97ad3b07..b10867ed 100644 --- a/sdk/couchbase-core/src/lib.rs +++ b/sdk/couchbase-core/src/lib.rs @@ -9,7 +9,7 @@ mod kvclientmanager; mod kvclientpool; pub mod memdx; mod parsedconfig; -pub mod result; mod scram; pub mod service_type; mod vbucketmap; +mod vbucketrouter; diff --git a/sdk/couchbase-core/src/memdx/error.rs b/sdk/couchbase-core/src/memdx/error.rs index df925fed..2738c48f 100644 --- a/sdk/couchbase-core/src/memdx/error.rs +++ b/sdk/couchbase-core/src/memdx/error.rs @@ -1,11 +1,17 @@ +use std::error::Error as StdError; use std::fmt::{Display, Formatter, Pointer}; use std::io; +use std::net::SocketAddr; use std::sync::Arc; +use serde::Deserialize; use thiserror::Error; use tokio::time::error::Elapsed; use crate::memdx::error; +use crate::memdx::error::ErrorKind::Server; +use crate::memdx::opcode::OpCode; +use crate::memdx::packet::ResponsePacket; use crate::memdx::status::Status; use crate::scram::ScramError; @@ -46,33 +52,121 @@ pub enum ErrorKind { #[error("Invalid argument {msg}")] #[non_exhaustive] InvalidArgument { msg: String }, + #[error("No supported auth mechanism was found")] + #[non_exhaustive] + NoSupportedAuthMechanisms, } -#[derive(Clone, Debug, Error, PartialEq, Eq)] -#[error("Server error: {kind}")] +#[derive(Clone, Debug, PartialEq, Eq)] #[non_exhaustive] pub struct ServerError { pub kind: ServerErrorKind, pub config: Option>, - pub context: Option, + pub context: Option>, + pub op_code: OpCode, + pub status: Status, + pub dispatched_to: String, + pub dispatched_from: String, + pub opaque: u32, } +impl Display for ServerError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut base_msg = format!("Server error: {}, status: 0x{:02x}, opcode: {}, dispatched from: {}, dispatched to: {}, opaque: {}", + self.kind, u16::from(self.status), self.op_code, self.dispatched_from, self.dispatched_to, self.opaque); + + if let Some(context) = &self.context { + if let Some(parsed) = Self::parse_context(context) { + base_msg = format!("{}, (context: {})", base_msg, parsed.text); + } + } + + write!(f, "{}", base_msg) + } +} + +impl StdError for ServerError {} + impl ServerError { - pub(crate) fn new(kind: ServerErrorKind) -> Self { + pub(crate) fn new( + kind: ServerErrorKind, + resp: &ResponsePacket, + dispatched_to: &Option, + dispatched_from: &Option, + ) -> Self { + let dispatched_to = if let Some(to) = dispatched_to { + to.to_string() + } else { + String::new() + }; + let dispatched_from = if let Some(from) = dispatched_from { + from.to_string() + } else { + String::new() + }; Self { kind, config: None, context: None, + op_code: resp.op_code, + status: resp.status, + dispatched_to, + dispatched_from, + opaque: resp.opaque, } } + + pub fn parse_context(context: &[u8]) -> Option { + if context.is_empty() { + return None; + } + + let context_json: ServerErrorContextJson = match serde_json::from_slice(context) { + Ok(c) => c, + Err(_) => { + return None; + } + }; + + let text = context_json.error.context.unwrap_or_default(); + + let error_ref = context_json.error_ref; + + let manifest_rev = context_json + .manifest_rev + .map(|manifest_rev| manifest_rev.parse().unwrap_or_default()); + + Some(ServerErrorContext { + text, + error_ref, + manifest_rev, + }) + } +} + +#[derive(Deserialize, Clone, Debug, PartialEq, Eq, Default)] +struct ServerErrorContextJsonContext { + #[serde(alias = "context")] + context: Option, +} + +#[derive(Deserialize, Clone, Debug, PartialEq, Eq)] +#[non_exhaustive] +struct ServerErrorContextJson { + #[serde(alias = "text", default)] + pub error: ServerErrorContextJsonContext, + #[serde(alias = "ref")] + pub error_ref: Option, + #[serde(alias = "manifest_uid")] + pub manifest_rev: Option, } #[derive(Clone, Debug, PartialEq, Eq)] #[non_exhaustive] pub struct ServerErrorContext { pub text: String, - pub error_ref: String, - pub manifest_rev: u64, + pub error_ref: Option, + pub manifest_rev: Option, } #[derive(Error, Clone, Debug, Eq, Hash, PartialEq)] @@ -120,20 +214,31 @@ impl Error { } pub fn has_server_config(&self) -> Option<&Vec> { - if let ErrorKind::Server(ServerError { config, .. }) = self.kind.as_ref() { + if let Server(ServerError { config, .. }) = self.kind.as_ref() { config.as_ref() } else { None } } - pub fn has_server_error_context(&self) -> Option<&ServerErrorContext> { - if let ErrorKind::Server(ServerError { context, .. }) = self.kind.as_ref() { + pub fn has_server_error_context(&self) -> Option<&Vec> { + if let Server(ServerError { context, .. }) = self.kind.as_ref() { context.as_ref() } else { None } } + + pub fn is_dispatch_error(&self) -> bool { + matches!(self.kind.as_ref(), ErrorKind::Dispatch(_)) + } + + pub fn is_notmyvbucket_error(&self) -> bool { + match self.kind.as_ref() { + Server(e) => e.kind == ServerErrorKind::NotMyVbucket, + _ => false, + } + } } impl Display for CancellationErrorKind { @@ -159,28 +264,24 @@ where } } -impl From for Error { - fn from(kind: ServerErrorKind) -> Self { +impl From for Error { + fn from(value: ServerError) -> Self { Self { - kind: Box::new(ErrorKind::Server(ServerError::new(kind))), + kind: Box::new(Server(value)), } } } -impl From for Error { - fn from(value: io::Error) -> Self { - Self { - kind: Box::new(ErrorKind::Io(Arc::new(value))), - } +impl From for Error { + fn from(e: ScramError) -> Self { + ErrorKind::Protocol { msg: e.to_string() }.into() } } -impl From for Error { - fn from(value: ScramError) -> Self { +impl From for Error { + fn from(value: io::Error) -> Self { Self { - kind: Box::new(ErrorKind::Server(ServerError::new(ServerErrorKind::Auth { - msg: value.to_string(), - }))), + kind: Box::new(ErrorKind::Io(Arc::new(value))), } } } diff --git a/sdk/couchbase-core/src/memdx/op_auth_saslauto.rs b/sdk/couchbase-core/src/memdx/op_auth_saslauto.rs index 0a5845f1..8e86a431 100644 --- a/sdk/couchbase-core/src/memdx/op_auth_saslauto.rs +++ b/sdk/couchbase-core/src/memdx/op_auth_saslauto.rs @@ -4,8 +4,8 @@ use tokio::time::Instant; use crate::memdx::auth_mechanism::AuthMechanism; use crate::memdx::dispatcher::Dispatcher; -use crate::memdx::error::{ErrorKind, ServerErrorKind}; use crate::memdx::error::CancellationErrorKind::{RequestCancelled, Timeout}; +use crate::memdx::error::ErrorKind; use crate::memdx::error::Result; use crate::memdx::op_auth_saslbyname::{ OpSASLAuthByNameEncoder, OpsSASLAuthByName, SASLAuthByNameOptions, @@ -106,13 +106,7 @@ impl OpsSASLAuthAuto { let selected_mech = match selected_mech { Some(mech) => mech, None => { - return Err(ServerErrorKind::Auth { - msg: format!( - "No supported auth mechanism was found (enabled: {:?}, server: {:?})", - opts.enabled_mechs, server_mechs - ), - } - .into()); + return Err(ErrorKind::NoSupportedAuthMechanisms.into()); } }; diff --git a/sdk/couchbase-core/src/memdx/ops_core.rs b/sdk/couchbase-core/src/memdx/ops_core.rs index b5b73d29..3f476e27 100644 --- a/sdk/couchbase-core/src/memdx/ops_core.rs +++ b/sdk/couchbase-core/src/memdx/ops_core.rs @@ -1,9 +1,10 @@ use std::io::Write; +use std::net::SocketAddr; use byteorder::{BigEndian, WriteBytesExt}; use crate::memdx::dispatcher::Dispatcher; -use crate::memdx::error::{Error, ServerErrorKind}; +use crate::memdx::error::{Error, ErrorKind, ServerError, ServerErrorKind}; use crate::memdx::error::Result; use crate::memdx::magic::Magic; use crate::memdx::op_auth_saslauto::OpSASLAutoEncoder; @@ -27,17 +28,41 @@ use crate::memdx::status::Status; pub struct OpsCore {} impl OpsCore { - pub(crate) fn decode_error(resp: &ResponsePacket) -> Error { + pub(crate) fn decode_error_context( + resp: &ResponsePacket, + kind: ServerErrorKind, + dispatched_to: &Option, + dispatched_from: &Option, + ) -> Error { + let mut base_cause = ServerError::new(kind, resp, dispatched_to, dispatched_from); + + if let Some(value) = &resp.value { + if resp.status == Status::NotMyVbucket { + // TODO: unsure what this actually does. + base_cause.config = Some(value.to_vec()); + } else { + base_cause.context = Some(value.to_vec()) + } + } + + ErrorKind::Server(base_cause).into() + } + + pub(crate) fn decode_error( + resp: &ResponsePacket, + dispatched_to: &Option, + dispatched_from: &Option, + ) -> Error { let status = resp.status; - if status == Status::NotMyVbucket { - ServerErrorKind::NotMyVbucket.into() + let base_error_kind = if status == Status::NotMyVbucket { + ServerErrorKind::NotMyVbucket } else if status == Status::TmpFail { - ServerErrorKind::TmpFail.into() + ServerErrorKind::TmpFail } else { - ServerErrorKind::UnknownStatus { status }.into() - } + ServerErrorKind::UnknownStatus { status } + }; - // TODO: decode error context + Self::decode_error_context(resp, base_error_kind, dispatched_to, dispatched_from) } } diff --git a/sdk/couchbase-core/src/memdx/ops_crud.rs b/sdk/couchbase-core/src/memdx/ops_crud.rs index ef28d453..8645d505 100644 --- a/sdk/couchbase-core/src/memdx/ops_crud.rs +++ b/sdk/couchbase-core/src/memdx/ops_crud.rs @@ -1,3 +1,4 @@ +use std::net::SocketAddr; use std::time::Duration; use byteorder::{BigEndian, WriteBytesExt}; @@ -5,7 +6,7 @@ use bytes::{BufMut, BytesMut}; use crate::memdx::dispatcher::Dispatcher; use crate::memdx::durability_level::{DurabilityLevel, DurabilityLevelSettings}; -use crate::memdx::error::{Error, ErrorKind, ServerErrorKind}; +use crate::memdx::error::{Error, ErrorKind, ServerError, ServerErrorKind}; use crate::memdx::error::Result; use crate::memdx::ext_frame_code::{ExtReqFrameCode, ExtResFrameCode}; use crate::memdx::magic::Magic; @@ -178,24 +179,32 @@ impl OpsCrud { Ok(magic) } - pub(crate) fn decode_common_status(status: Status) -> Result<()> { - let err = match status { - Status::CollectionUnknown => ServerErrorKind::UnknownCollectionID.into(), - Status::AccessError => ServerErrorKind::Access.into(), + pub(crate) fn decode_common_status( + resp: &ResponsePacket, + dispatched_to: &Option, + dispatched_from: &Option, + ) -> Result<()> { + let kind = match resp.status { + Status::CollectionUnknown => ServerErrorKind::UnknownCollectionID, + Status::AccessError => ServerErrorKind::Access, _ => { return Ok(()); } }; - Err(err) + Err(ErrorKind::Server(ServerError::new(kind, resp, dispatched_to, dispatched_from)).into()) } - pub(crate) fn decode_common_error(resp: &ResponsePacket) -> Error { - if let Err(e) = Self::decode_common_status(resp.status) { + pub(crate) fn decode_common_error( + resp: &ResponsePacket, + dispatched_to: &Option, + dispatched_from: &Option, + ) -> Error { + if let Err(e) = Self::decode_common_status(resp, dispatched_to, dispatched_from) { return e; }; - OpsCore::decode_error(resp) + OpsCore::decode_error(resp, dispatched_to, dispatched_from) } } diff --git a/sdk/couchbase-core/src/memdx/response.rs b/sdk/couchbase-core/src/memdx/response.rs index 42c53f90..f83fe676 100644 --- a/sdk/couchbase-core/src/memdx/response.rs +++ b/sdk/couchbase-core/src/memdx/response.rs @@ -5,7 +5,7 @@ use byteorder::{BigEndian, ReadBytesExt}; use crate::memdx::auth_mechanism::AuthMechanism; use crate::memdx::client_response::ClientResponse; -use crate::memdx::error::{Error, ErrorKind, ServerErrorKind}; +use crate::memdx::error::{Error, ErrorKind, ServerError, ServerErrorKind}; use crate::memdx::hello_feature::HelloFeature; use crate::memdx::ops_core::OpsCore; use crate::memdx::ops_crud::{decode_res_ext_frames, OpsCrud}; @@ -25,7 +25,11 @@ impl TryFromClientResponse for HelloResponse { let packet = resp.packet(); let status = packet.status; if status != Status::Success { - return Err(OpsCore::decode_error(packet)); + return Err(OpsCore::decode_error( + packet, + resp.local_addr(), + resp.peer_addr(), + )); } let mut features: Vec = Vec::new(); @@ -60,7 +64,11 @@ impl TryFromClientResponse for GetErrorMapResponse { let packet = resp.packet(); let status = packet.status; if status != Status::Success { - return Err(OpsCore::decode_error(packet)); + return Err(OpsCore::decode_error( + packet, + resp.local_addr(), + resp.peer_addr(), + )); } // TODO: Clone? @@ -82,7 +90,11 @@ impl TryFromClientResponse for SelectBucketResponse { if status == Status::AccessError || status == Status::KeyNotFound { return Err(ErrorKind::UnknownBucketName.into()); } - return Err(OpsCore::decode_error(packet)); + return Err(OpsCore::decode_error( + packet, + resp.local_addr(), + resp.peer_addr(), + )); } Ok(SelectBucketResponse {}) @@ -109,7 +121,11 @@ impl TryFromClientResponse for SASLAuthResponse { } if status != Status::Success { - return Err(OpsCore::decode_error(packet)); + return Err(OpsCore::decode_error( + packet, + resp.local_addr(), + resp.peer_addr(), + )); } Ok(SASLAuthResponse { @@ -131,7 +147,11 @@ impl TryFromClientResponse for SASLStepResponse { let packet = resp.packet(); let status = packet.status; if status != Status::Success { - return Err(OpsCore::decode_error(packet)); + return Err(OpsCore::decode_error( + packet, + resp.local_addr(), + resp.peer_addr(), + )); } Ok(SASLStepResponse { @@ -157,9 +177,19 @@ impl TryFromClientResponse for SASLListMechsResponse { // ns_server has not posted a configuration for the bucket to kv_engine yet. We // transform this into a ErrTmpFail as we make the assumption that the // SelectBucket will have failed if this was anything but a transient issue. - return Err(ServerErrorKind::ConfigNotSet.into()); + return Err(ServerError::new( + ServerErrorKind::ConfigNotSet, + packet, + resp.local_addr(), + resp.peer_addr(), + ) + .into()); } - return Err(OpsCore::decode_error(packet)); + return Err(OpsCore::decode_error( + packet, + resp.local_addr(), + resp.peer_addr(), + )); } // TODO: Clone? @@ -208,7 +238,11 @@ impl TryFromClientResponse for GetClusterConfigResponse { let packet = resp.packet(); let status = packet.status; if status != Status::Success { - return Err(OpsCore::decode_error(packet)); + return Err(OpsCore::decode_error( + packet, + resp.local_addr(), + resp.peer_addr(), + )); } let host = match resp.local_addr() { @@ -261,13 +295,35 @@ impl TryFromClientResponse for SetResponse { let status = packet.status; if status == Status::TooBig { - return Err(ServerErrorKind::TooBig.into()); + return Err(ServerError::new( + ServerErrorKind::TooBig, + resp.packet(), + resp.local_addr(), + resp.peer_addr(), + ) + .into()); } else if status == Status::Locked { - return Err(ServerErrorKind::Locked.into()); + return Err(ServerError::new( + ServerErrorKind::Locked, + resp.packet(), + resp.local_addr(), + resp.peer_addr(), + ) + .into()); } else if status == Status::KeyExists { - return Err(ServerErrorKind::KeyExists.into()); + return Err(ServerError::new( + ServerErrorKind::KeyExists, + resp.packet(), + resp.local_addr(), + resp.peer_addr(), + ) + .into()); } else if status != Status::Success { - return Err(OpsCrud::decode_common_error(resp.packet())); + return Err(OpsCrud::decode_common_error( + resp.packet(), + resp.local_addr(), + resp.peer_addr(), + )); } let mutation_token = if let Some(extras) = &packet.extras { @@ -320,9 +376,19 @@ impl TryFromClientResponse for GetResponse { let status = packet.status; if status == Status::KeyNotFound { - return Err(ServerErrorKind::KeyNotFound.into()); + return Err(ServerError::new( + ServerErrorKind::KeyNotFound, + resp.packet(), + resp.local_addr(), + resp.peer_addr(), + ) + .into()); } else if status != Status::Success { - return Err(OpsCrud::decode_common_error(resp.packet())); + return Err(OpsCrud::decode_common_error( + resp.packet(), + resp.local_addr(), + resp.peer_addr(), + )); } let flags = if let Some(extras) = &packet.extras { diff --git a/sdk/couchbase-core/src/memdx/status.rs b/sdk/couchbase-core/src/memdx/status.rs index 8e2a540b..d1a5c81f 100644 --- a/sdk/couchbase-core/src/memdx/status.rs +++ b/sdk/couchbase-core/src/memdx/status.rs @@ -1,4 +1,4 @@ -use std::fmt::{Display, Formatter}; +use std::fmt::{Display, Formatter, LowerHex}; #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub enum Status { @@ -18,6 +18,27 @@ pub enum Status { Unknown(u16), } +impl From for u16 { + fn from(value: Status) -> Self { + match value { + Status::Success => 0x00, + Status::KeyNotFound => 0x01, + Status::KeyExists => 0x02, + Status::TooBig => 0x03, + Status::InvalidArgs => 0x04, + Status::Locked => 0x09, + Status::NotMyVbucket => 0x07, + Status::AuthError => 0x20, + Status::SASLAuthContinue => 0x21, + Status::AccessError => 0x24, + Status::TmpFail => 0x86, + Status::CollectionUnknown => 0x88, + + Status::Unknown(value) => value, + } + } +} + impl From for Status { fn from(value: u16) -> Self { match value { diff --git a/sdk/couchbase-core/src/result.rs b/sdk/couchbase-core/src/result.rs deleted file mode 100644 index ec5ad716..00000000 --- a/sdk/couchbase-core/src/result.rs +++ /dev/null @@ -1 +0,0 @@ -pub type CoreResult = Result; diff --git a/sdk/couchbase-core/src/vbucketmap.rs b/sdk/couchbase-core/src/vbucketmap.rs index 17d69634..77741eca 100644 --- a/sdk/couchbase-core/src/vbucketmap.rs +++ b/sdk/couchbase-core/src/vbucketmap.rs @@ -1,5 +1,5 @@ -use crate::error::CoreError; -use crate::result::CoreResult; +use crate::error::Error; +use crate::error::Result; #[derive(Debug, Clone, Eq, PartialEq)] pub(crate) struct VbucketMap { @@ -8,10 +8,10 @@ pub(crate) struct VbucketMap { } impl VbucketMap { - pub fn new(entries: Vec>, num_replicas: usize) -> CoreResult { + pub fn new(entries: Vec>, num_replicas: usize) -> Result { if entries.is_empty() { - return Err(CoreError::Placeholder( - "vbucket map must have at least a single entry".to_string(), + return Err(Error::new_internal_error( + "vbucket map must have at least a single entry", )); } @@ -37,16 +37,16 @@ impl VbucketMap { self.num_replicas } - pub fn vbucket_by_key(&self, key: Vec) -> u16 { - let checksum = crc32fast::hash(key.as_slice()); + pub fn vbucket_by_key(&self, key: &[u8]) -> u16 { + let checksum = crc32fast::hash(key); let mid_bits = (checksum >> 16) as u16 & 0x7fff; mid_bits % (self.entries.len() as u16) } - pub fn node_by_vbucket(&self, vb_id: u16, vb_server_idx: u32) -> CoreResult { + pub fn node_by_vbucket(&self, vb_id: u16, vb_server_idx: u32) -> Result { let num_servers = (self.num_replicas as u32) + 1; if vb_server_idx > num_servers { - return Err(CoreError::Placeholder("invalid replica".to_string())); + return Err(Error::new_invalid_arguments_error("invalid replica")); } if let Some(idx) = self.entries.get(vb_id as usize) { @@ -56,7 +56,7 @@ impl VbucketMap { Ok(-1) } } else { - Err(CoreError::Placeholder("invalid vbucket".to_string())) + Err(Error::new_invalid_arguments_error("invalid vbucket")) } } } @@ -69,17 +69,15 @@ mod tests { fn vbucketmap_with_1024_vbs() { let vb_map = VbucketMap::new(vec![vec![]; 1024], 1).unwrap(); - assert_eq!(0x0202u16, vb_map.vbucket_by_key(vec![0])); + assert_eq!(0x0202u16, vb_map.vbucket_by_key(vec![0].as_slice())); assert_eq!( 0x00aau16, - vb_map.vbucket_by_key(vec![0, 1, 2, 3, 4, 5, 6, 7]) + vb_map.vbucket_by_key(vec![0, 1, 2, 3, 4, 5, 6, 7].as_slice()) ); - assert_eq!(0x0210u16, vb_map.vbucket_by_key(b"hello".to_vec())); + assert_eq!(0x0210u16, vb_map.vbucket_by_key(b"hello")); assert_eq!( 0x03d4u16, - vb_map.vbucket_by_key( - b"hello world, I am a super long key lets see if it works".to_vec() - ) + vb_map.vbucket_by_key(b"hello world, I am a super long key lets see if it works") ); } @@ -87,17 +85,15 @@ mod tests { fn vbucketmap_with_64_vbs() { let vb_map = VbucketMap::new(vec![vec![]; 64], 1).unwrap(); - assert_eq!(0x0002u16, vb_map.vbucket_by_key(vec![0])); + assert_eq!(0x0002u16, vb_map.vbucket_by_key(vec![0].as_slice())); assert_eq!( 0x002au16, - vb_map.vbucket_by_key(vec![0, 1, 2, 3, 4, 5, 6, 7]) + vb_map.vbucket_by_key(vec![0, 1, 2, 3, 4, 5, 6, 7].as_slice()) ); - assert_eq!(0x0010u16, vb_map.vbucket_by_key(b"hello".to_vec())); + assert_eq!(0x0010u16, vb_map.vbucket_by_key(b"hello")); assert_eq!( 0x0014u16, - vb_map.vbucket_by_key( - b"hello world, I am a super long key lets see if it works".to_vec() - ) + vb_map.vbucket_by_key(b"hello world, I am a super long key lets see if it works") ); } @@ -105,17 +101,15 @@ mod tests { fn vbucketmap_with_48_vbs() { let vb_map = VbucketMap::new(vec![vec![]; 48], 1).unwrap(); - assert_eq!(0x0012u16, vb_map.vbucket_by_key(vec![0])); + assert_eq!(0x0012u16, vb_map.vbucket_by_key(vec![0].as_slice())); assert_eq!( 0x000au16, - vb_map.vbucket_by_key(vec![0, 1, 2, 3, 4, 5, 6, 7]) + vb_map.vbucket_by_key(vec![0, 1, 2, 3, 4, 5, 6, 7].as_slice()) ); - assert_eq!(0x0010u16, vb_map.vbucket_by_key(b"hello".to_vec())); + assert_eq!(0x0010u16, vb_map.vbucket_by_key(b"hello")); assert_eq!( 0x0004u16, - vb_map.vbucket_by_key( - b"hello world, I am a super long key lets see if it works".to_vec() - ) + vb_map.vbucket_by_key(b"hello world, I am a super long key lets see if it works") ); } @@ -123,17 +117,15 @@ mod tests { fn vbucketmap_with_13_vbs() { let vb_map = VbucketMap::new(vec![vec![]; 13], 1).unwrap(); - assert_eq!(0x000cu16, vb_map.vbucket_by_key(vec![0])); + assert_eq!(0x000cu16, vb_map.vbucket_by_key(vec![0].as_slice())); assert_eq!( 0x0008u16, - vb_map.vbucket_by_key(vec![0, 1, 2, 3, 4, 5, 6, 7]) + vb_map.vbucket_by_key(vec![0, 1, 2, 3, 4, 5, 6, 7].as_slice()) ); - assert_eq!(0x0008u16, vb_map.vbucket_by_key(b"hello".to_vec())); + assert_eq!(0x0008u16, vb_map.vbucket_by_key(b"hello")); assert_eq!( 0x0003u16, - vb_map.vbucket_by_key( - b"hello world, I am a super long key lets see if it works".to_vec() - ) + vb_map.vbucket_by_key(b"hello world, I am a super long key lets see if it works") ); } } diff --git a/sdk/couchbase-core/src/vbucketrouter.rs b/sdk/couchbase-core/src/vbucketrouter.rs new file mode 100644 index 00000000..c129fe63 --- /dev/null +++ b/sdk/couchbase-core/src/vbucketrouter.rs @@ -0,0 +1,347 @@ +use std::future::Future; +use std::sync::{Arc, Mutex}; + +use crate::cbconfig::TerseConfig; +use crate::error::Error; +use crate::error::Result; +use crate::memdx::response::TryFromClientResponse; +use crate::vbucketmap::VbucketMap; + +pub(crate) trait VbucketRouter { + fn update_vbucket_info(&self, info: VbucketRoutingInfo); + fn dispatch_by_key(&self, key: &[u8], vbucket_server_idx: u32) -> Result<(String, u16)>; + fn dispatch_to_vbucket(&self, vb_id: u16) -> Result; + fn num_replicas(&self) -> usize; +} + +pub(crate) struct VbucketRoutingInfo { + pub vbucket_info: VbucketMap, + pub server_list: Vec, +} + +pub(crate) struct VbucketRouterOptions {} + +pub(crate) struct StdVbucketRouter { + routing_info: Arc>, +} + +impl StdVbucketRouter { + pub(crate) fn new(info: VbucketRoutingInfo, _opts: VbucketRouterOptions) -> Self { + Self { + routing_info: Arc::new(Mutex::new(info)), + } + } +} + +impl VbucketRouter for StdVbucketRouter { + fn update_vbucket_info(&self, info: VbucketRoutingInfo) { + *self.routing_info.lock().unwrap() = info; + } + + fn dispatch_by_key(&self, key: &[u8], vbucket_server_idx: u32) -> Result<(String, u16)> { + let info = self.routing_info.lock().unwrap(); + let vb_id = info.vbucket_info.vbucket_by_key(key); + let idx = info + .vbucket_info + .node_by_vbucket(vb_id, vbucket_server_idx)?; + + if idx >= 0 { + if let Some(server) = info.server_list.get(idx as usize) { + return Ok((server.clone(), vb_id)); + } + } + + Err(Error::new_internal_error("No server assigned")) + } + + fn dispatch_to_vbucket(&self, vb_id: u16) -> Result { + let info = self.routing_info.lock().unwrap(); + let idx = info.vbucket_info.node_by_vbucket(vb_id, 0)?; + + if idx > 0 { + if let Some(server) = info.server_list.get(idx as usize) { + return Ok(server.clone()); + } + } + + Err(Error::new_internal_error("No server assigned")) + } + + fn num_replicas(&self) -> usize { + let info = self.routing_info.lock().unwrap(); + info.vbucket_info.num_replicas() + } +} + +pub(crate) trait NotMyVbucketConfigHandler { + fn not_my_vbucket_config(&self, config: TerseConfig, source_hostname: &str); +} + +pub(crate) async fn orchestrate_memd_routing( + vb: &V, + ch: Option>, + key: &[u8], + vb_server_idx: u32, + mut operation: impl FnMut(String, u16) -> Fut, +) -> Result +where + V: VbucketRouter, + Fut: Future> + Send, +{ + let (mut endpoint, mut vb_id) = vb.dispatch_by_key(key, vb_server_idx)?; + + loop { + let err = match operation(endpoint.clone(), vb_id).await { + Ok(r) => return Ok(r), + Err(e) => e, + }; + + let ch = ch.clone(); + + if ch.is_none() { + return Err(err); + } + + let config = if let Some(memdx_err) = err.is_memdx_error() { + if memdx_err.is_notmyvbucket_error() { + if let Some(config) = memdx_err.has_server_config() { + config + } else { + return Err(err); + } + } else { + return Err(err); + } + } else { + return Err(err); + }; + + if config.is_empty() { + return Err(err); + } + + let value = match std::str::from_utf8(config.as_slice()) { + Ok(v) => v.to_string(), + Err(_e) => "".to_string(), + }; + + let config = value.replace("$HOST", endpoint.as_ref()); + + let config_json: TerseConfig = match serde_json::from_str(&config) { + Ok(c) => c, + Err(_) => { + return Err(err); + } + }; + + ch.unwrap().not_my_vbucket_config(config_json, &endpoint); + + let (new_endpoint, new_vb_id) = vb.dispatch_by_key(key, vb_server_idx)?; + if new_endpoint == endpoint && new_vb_id == vb_id { + return Err(err); + } + + endpoint = new_endpoint; + vb_id = new_vb_id; + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::ops::Add; + use std::sync::Arc; + use std::time::Duration; + + use tokio::sync::mpsc::unbounded_channel; + use tokio::time::Instant; + + use crate::authenticator::PasswordAuthenticator; + use crate::cbconfig::TerseConfig; + use crate::kvclient::{KvClientConfig, StdKvClient}; + use crate::kvclient_ops::KvClientOps; + use crate::kvclientmanager::{ + KvClientManager, KvClientManagerConfig, KvClientManagerOptions, orchestrate_memd_client, + StdKvClientManager, + }; + use crate::kvclientpool::NaiveKvClientPool; + use crate::memdx::client::Client; + use crate::memdx::packet::ResponsePacket; + use crate::memdx::request::{GetRequest, SetRequest}; + use crate::vbucketmap::VbucketMap; + use crate::vbucketrouter::{ + NotMyVbucketConfigHandler, orchestrate_memd_routing, StdVbucketRouter, VbucketRouter, + VbucketRouterOptions, VbucketRoutingInfo, + }; + + struct NVMBHandler {} + + impl NotMyVbucketConfigHandler for NVMBHandler { + fn not_my_vbucket_config(&self, config: TerseConfig, source_hostname: &str) {} + } + + #[test] + fn dispatch_to_key() { + let routing_info = VbucketRoutingInfo { + vbucket_info: VbucketMap::new( + vec![vec![0, 1], vec![1, 0], vec![0, 1], vec![0, 1], vec![1, 0]], + 1, + ) + .unwrap(), + server_list: vec!["endpoint1".to_string(), "endpoint2".to_string()], + }; + + let dispatcher = StdVbucketRouter::new(routing_info, VbucketRouterOptions {}); + + let (endpoint, vb_id) = dispatcher.dispatch_by_key(b"key1", 0).unwrap(); + + assert_eq!("endpoint2", endpoint); + assert_eq!(1, vb_id); + + let (endpoint, vb_id) = dispatcher.dispatch_by_key(b"key2", 0).unwrap(); + + assert_eq!("endpoint1", endpoint); + assert_eq!(3, vb_id); + + let (endpoint, vb_id) = dispatcher.dispatch_by_key(b"key2", 1).unwrap(); + + assert_eq!("endpoint2", endpoint); + assert_eq!(3, vb_id); + } + + struct Resp {} + + #[tokio::test] + async fn can_orchestrate_memd_routing() { + let _ = env_logger::try_init(); + + let instant = Instant::now().add(Duration::new(7, 0)); + + let (orphan_tx, mut orphan_rx) = unbounded_channel::(); + + tokio::spawn(async move { + loop { + match orphan_rx.recv().await { + Some(resp) => { + dbg!("unexpected orphan", resp); + } + None => { + return; + } + } + } + }); + + let client_config = KvClientConfig { + address: "192.168.107.128:11210" + .parse() + .expect("Failed to parse address"), + root_certs: None, + accept_all_certs: None, + client_name: "myclient".to_string(), + authenticator: Some(Arc::new(PasswordAuthenticator { + username: "Administrator".to_string(), + password: "password".to_string(), + })), + selected_bucket: Some("default".to_string()), + disable_default_features: false, + disable_error_map: false, + disable_bootstrap: false, + }; + + let mut client_configs = HashMap::new(); + client_configs.insert("192.168.107.128:11210".to_string(), client_config); + + let manger_config = KvClientManagerConfig { + num_pool_connections: 1, + clients: client_configs, + }; + + let manager: StdKvClientManager>> = + StdKvClientManager::new( + manger_config, + KvClientManagerOptions { + connect_timeout: Default::default(), + connect_throttle_period: Default::default(), + orphan_handler: Arc::new(orphan_tx), + }, + ) + .await + .unwrap(); + + let routing_info = VbucketRoutingInfo { + vbucket_info: VbucketMap::new( + vec![vec![0, 1], vec![1, 0], vec![0, 1], vec![0, 1], vec![1, 0]], + 1, + ) + .unwrap(), + server_list: vec!["192.168.107.128:11210".to_string()], + }; + + let dispatcher = StdVbucketRouter::new(routing_info, VbucketRouterOptions {}); + + let set_result = orchestrate_memd_routing( + &dispatcher, + Some(Arc::new(NVMBHandler {})), + b"test", + 0, + |endpoint: String, vb_id: u16| async { + orchestrate_memd_client( + &manager, + endpoint, + |client: Arc>| async move { + client + .set(SetRequest { + collection_id: 0, + key: "test".as_bytes().into(), + vbucket_id: 1, + flags: 0, + value: "test".as_bytes().into(), + datatype: 0, + expiry: None, + preserve_expiry: None, + cas: None, + on_behalf_of: None, + durability_level: None, + durability_level_timeout: None, + }) + .await + }, + ) + .await + }, + ) + .await + .unwrap(); + + dbg!(set_result); + + let get_result = orchestrate_memd_routing( + &dispatcher, + Some(Arc::new(NVMBHandler {})), + b"test", + 0, + |endpoint: String, vb_id: u16| async { + orchestrate_memd_client( + &manager, + endpoint, + |client: Arc>| async move { + client + .get(GetRequest { + collection_id: 0, + key: "test".as_bytes().into(), + vbucket_id: 1, + on_behalf_of: None, + }) + .await + }, + ) + .await + }, + ) + .await + .unwrap(); + + dbg!(get_result); + } +}