Skip to content

RSCBC-9: Implement memd routing #163

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions sdk/couchbase-core/src/authenticator.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
use std::fmt::Debug;

use crate::result::CoreResult;
use crate::error::Result;
use crate::service_type::ServiceType;

pub trait Authenticator: Debug + Send + Sync {
// TODO: get_client_certificate needs some thought about how to expose the certificate
// fn get_client_certificate(service: ServiceType, host_port: String) ->
fn get_credentials(
&self,
service_type: ServiceType,
host_port: String,
) -> CoreResult<UserPassPair>;
fn get_credentials(&self, service_type: ServiceType, host_port: String)
-> Result<UserPassPair>;
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand All @@ -30,7 +27,7 @@ impl Authenticator for PasswordAuthenticator {
&self,
_service_type: ServiceType,
_host_port: String,
) -> CoreResult<UserPassPair> {
) -> Result<UserPassPair> {
Ok(UserPassPair {
username: self.username.clone(),
password: self.password.clone(),
Expand Down
9 changes: 3 additions & 6 deletions sdk/couchbase-core/src/configparser.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
use std::collections::HashMap;

use crate::cbconfig::{TerseConfig, TerseExtNodePorts, VBucketServerMap};
use crate::error::Result;
use crate::parsedconfig::{
BucketType, ParsedConfig, ParsedConfigBucket, ParsedConfigFeatures, ParsedConfigNode,
ParsedConfigNodeAddresses, ParsedConfigNodePorts,
};
use crate::result::CoreResult;
use crate::vbucketmap::VbucketMap;

pub(crate) struct ConfigParser {}

impl ConfigParser {
pub fn parse_terse_config(
config: TerseConfig,
source_hostname: &str,
) -> CoreResult<ParsedConfig> {
pub fn parse_terse_config(config: TerseConfig, source_hostname: &str) -> Result<ParsedConfig> {
let rev_id = config.rev;
let rev_epoch = config.rev_epoch.unwrap_or_default();

Expand Down Expand Up @@ -119,7 +116,7 @@ impl ConfigParser {

fn parse_vbucket_server_map(
vbucket_server_map: Option<VBucketServerMap>,
) -> CoreResult<Option<VbucketMap>> {
) -> Result<Option<VbucketMap>> {
if let Some(vbucket_server_map) = vbucket_server_map {
if vbucket_server_map.vbucket_map.is_empty() {
return Ok(None);
Expand Down
10 changes: 5 additions & 5 deletions sdk/couchbase-core/src/configwatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ use tokio::time::sleep;

use crate::cbconfig::TerseConfig;
use crate::configparser::ConfigParser;
use crate::error::Result;
use crate::kvclient::KvClient;
use crate::kvclient_ops::KvClientOps;
use crate::kvclientmanager::KvClientManager;
use crate::memdx::request::GetClusterConfigRequest;
use crate::parsedconfig::ParsedConfig;
use crate::result::CoreResult;

pub(crate) trait ConfigWatcher {
fn watch(&self, on_shutdown_rx: Receiver<()>) -> Receiver<ParsedConfig>;
fn reconfigure(&self, config: ConfigWatcherMemdConfig) -> impl Future<Output = CoreResult<()>>;
fn reconfigure(&self, config: ConfigWatcherMemdConfig) -> impl Future<Output = Result<()>>;
}

pub(crate) struct ConfigWatcherMemdConfig {
Expand All @@ -41,7 +41,7 @@ impl<M> ConfigWatcherMemdInner<M>
where
M: KvClientManager,
{
pub async fn reconfigure(&self, config: ConfigWatcherMemdConfig) -> CoreResult<()> {
pub async fn reconfigure(&self, config: ConfigWatcherMemdConfig) -> Result<()> {
let mut endpoints = self.endpoints.lock().unwrap();
*endpoints = config.endpoints;

Expand Down Expand Up @@ -139,7 +139,7 @@ where
}
}

async fn poll_one(&self, endpoint: String) -> CoreResult<ParsedConfig> {
async fn poll_one(&self, endpoint: String) -> Result<ParsedConfig> {
let client = self.kv_client_manager.get_client(endpoint).await?;

let addr = client.remote_addr();
Expand Down Expand Up @@ -189,7 +189,7 @@ where
on_new_config_rx
}

async fn reconfigure(&self, config: ConfigWatcherMemdConfig) -> CoreResult<()> {
async fn reconfigure(&self, config: ConfigWatcherMemdConfig) -> Result<()> {
self.inner.reconfigure(config).await
}
}
Expand Down
107 changes: 89 additions & 18 deletions sdk/couchbase-core/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,100 @@
use std::fmt::Display;

use crate::error::CoreError::{Dispatch, Placeholder, PlaceholderMemdxWrapper};
use crate::memdx::error::{Error, ErrorKind};

#[derive(thiserror::Error, Debug)]
pub enum CoreError {
#[error("Dispatch error {0}")]
Dispatch(Error),
#[error("Placeholder error {0}")]
Placeholder(String),
#[error("Placeholder memdx wrapper error {0}")]
PlaceholderMemdxWrapper(Error),
use crate::memdx::error::Error as MemdxError;

pub type Result<T> = std::result::Result<T, Error>;

#[derive(thiserror::Error, Debug, Clone)]
#[error("{kind}")]
#[non_exhaustive]
pub struct Error {
pub kind: Box<ErrorKind>,
// #[source]
// pub(crate) source: Option<Box<dyn StdError + 'static>>,
}

impl From<Error> for CoreError {
fn from(value: Error) -> Self {
match value.kind.as_ref() {
ErrorKind::Dispatch(_) => Dispatch(value),
_ => PlaceholderMemdxWrapper(value),
impl Error {
pub(crate) fn new(kind: ErrorKind) -> Self {
Self {
kind: Box::new(kind),
// source: None,
}
}

// pub(crate) fn with_source(kind: ErrorKind, source: impl std::error::Error + 'static) -> Self {
// Self {
// kind: Box::new(kind),
// source: Some(Box::new(source)),
// }
// }

pub fn is_memdx_error(&self) -> Option<&MemdxError> {
match self.kind.as_ref() {
ErrorKind::MemdxError(e) => Some(e),
_ => None,
}
}

pub(crate) fn new_invalid_arguments_error(msg: &str) -> Self {
Self::new(ErrorKind::InvalidArgument {
msg: msg.to_string(),
})
}

pub(crate) fn new_internal_error(msg: &str) -> Self {
Self::new(ErrorKind::Internal {
msg: msg.to_string(),
})
}
}

#[derive(thiserror::Error, Debug, Clone)]
#[non_exhaustive]
pub enum ErrorKind {
#[error("Vbucket map outdated")]
VbucketMapOutdated,
#[error("An error occurred during serialization/deserialization {msg}")]
#[non_exhaustive]
JSONError { msg: String },
#[error("Invalid argument {msg}")]
#[non_exhaustive]
InvalidArgument { msg: String },
#[error("{0}")]
MemdxError(MemdxError),
#[error("Endpoint not known {endpoint}")]
#[non_exhaustive]
EndpointNotKnown { endpoint: String },
#[error("no endpoints available")]
#[non_exhaustive]
NoEndpointsAvailable,
#[error("Shutdown")]
Shutdown,
#[error("Internal error {msg}")]
#[non_exhaustive]
Internal { msg: String },
}

impl<E> From<E> for Error
where
ErrorKind: From<E>,
{
fn from(err: E) -> Self {
Self {
kind: Box::new(err.into()),
}
}
}

impl From<MemdxError> for Error {
fn from(value: MemdxError) -> Self {
Self::new(ErrorKind::MemdxError(value))
}
}

impl From<serde_json::Error> for CoreError {
impl From<serde_json::Error> for Error {
fn from(value: serde_json::Error) -> Self {
Placeholder(value.to_string())
Self::new(ErrorKind::JSONError {
msg: value.to_string(),
})
}
}
39 changes: 16 additions & 23 deletions sdk/couchbase-core/src/kvclient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use tokio_rustls::rustls::RootCertStore;
use uuid::Uuid;

use crate::authenticator::Authenticator;
use crate::error::CoreError;
use crate::error::Error;
use crate::error::Result;
use crate::memdx::auth_mechanism::AuthMechanism;
use crate::memdx::connection::{Connection, ConnectOptions};
use crate::memdx::dispatcher::{Dispatcher, DispatcherOptions};
Expand All @@ -21,7 +22,6 @@ use crate::memdx::op_auth_saslauto::SASLAuthAutoOptions;
use crate::memdx::op_bootstrap::BootstrapOptions;
use crate::memdx::packet::ResponsePacket;
use crate::memdx::request::{GetErrorMapRequest, HelloRequest, SelectBucketRequest};
use crate::result::CoreResult;
use crate::service_type::ServiceType;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -62,13 +62,13 @@ pub(crate) trait KvClient: Sized + PartialEq + Send + Sync {
fn new(
config: KvClientConfig,
opts: KvClientOptions,
) -> impl Future<Output = CoreResult<Self>> + Send;
fn reconfigure(&self, config: KvClientConfig) -> impl Future<Output = CoreResult<()>> + Send;
) -> impl Future<Output = Result<Self>> + Send;
fn reconfigure(&self, config: KvClientConfig) -> impl Future<Output = Result<()>> + Send;
fn has_feature(&self, feature: HelloFeature) -> bool;
fn load_factor(&self) -> f64;
fn remote_addr(&self) -> SocketAddr;
fn local_addr(&self) -> Option<SocketAddr>;
fn close(&self) -> impl Future<Output = CoreResult<()>> + Send;
fn close(&self) -> impl Future<Output = Result<()>> + Send;
fn id(&self) -> &str;
}

Expand All @@ -89,8 +89,6 @@ pub(crate) struct StdKvClient<D: Dispatcher> {
// asynchronously and we do not support changing selected buckets.
selected_bucket: Mutex<Option<String>>,

closed: Arc<AtomicBool>,

id: String,
}

Expand All @@ -107,7 +105,7 @@ impl<D> KvClient for StdKvClient<D>
where
D: Dispatcher,
{
async fn new(config: KvClientConfig, opts: KvClientOptions) -> CoreResult<StdKvClient<D>> {
async fn new(config: KvClientConfig, opts: KvClientOptions) -> Result<StdKvClient<D>> {
let requested_features = if config.disable_default_features {
vec![]
} else {
Expand Down Expand Up @@ -170,8 +168,8 @@ where

if should_bootstrap && config.disable_bootstrap {
// TODO: error model needs thought.
return Err(CoreError::Placeholder(
"Bootstrap was disabled but options requiring bootstrap were specified".to_string(),
return Err(Error::new_invalid_arguments_error(
"Bootstrap was disabled but options requiring bootstrap were specified",
));
}

Expand Down Expand Up @@ -212,7 +210,6 @@ where
current_config: Mutex::new(config),
supported_features: vec![],
selected_bucket: Mutex::new(None),
closed,
id: id.clone(),
};

Expand Down Expand Up @@ -260,7 +257,7 @@ where
Ok(kv_cli)
}

async fn reconfigure(&self, config: KvClientConfig) -> CoreResult<()> {
async fn reconfigure(&self, config: KvClientConfig) -> Result<()> {
let mut current_config = self.current_config.lock().await;

// TODO: compare root certs or something somehow.
Expand All @@ -271,15 +268,15 @@ where
&& current_config.disable_error_map == config.disable_error_map
&& current_config.disable_bootstrap == config.disable_bootstrap)
{
return Err(CoreError::Placeholder(
"Cannot reconfigure due to conflicting options".to_string(),
return Err(Error::new_invalid_arguments_error(
"Cannot reconfigure due to conflicting options",
));
}

let selected_bucket_name = if current_config.selected_bucket != config.selected_bucket {
if current_config.selected_bucket.is_some() {
return Err(CoreError::Placeholder(
"Cannot reconfigure from one selected bucket to another".to_string(),
return Err(Error::new_invalid_arguments_error(
"Cannot reconfigure from one selected bucket to another",
));
}

Expand All @@ -292,8 +289,8 @@ where
};

if *current_config.deref() != config {
return Err(CoreError::Placeholder(
"Client config after reconfigure did not match new configuration".to_string(),
return Err(Error::new_invalid_arguments_error(
"Client config after reconfigure did not match new configuration",
));
}

Expand Down Expand Up @@ -336,11 +333,7 @@ where
self.local_addr
}

async fn close(&self) -> CoreResult<()> {
if self.closed.swap(true, Ordering::Relaxed) {
return Err(CoreError::Placeholder("Client closed".to_string()));
}

async fn close(&self) -> Result<()> {
Ok(self.cli.close().await?)
}

Expand Down
Loading
Loading