diff --git a/sdk/couchbase-core/Cargo.toml b/sdk/couchbase-core/Cargo.toml index 334621f9..2db04363 100644 --- a/sdk/couchbase-core/Cargo.toml +++ b/sdk/couchbase-core/Cargo.toml @@ -23,6 +23,8 @@ base64 = "0.22.1" webpki = "0.22.4" async-trait = "0.1.80" tokio-io = { version = "0.2.0-alpha.6", features = ["util"] } +crc32fast = "1.4.2" +serde_json = "1.0.120" [dev-dependencies] env_logger = "0.11" diff --git a/sdk/couchbase-core/src/cbconfig/mod.rs b/sdk/couchbase-core/src/cbconfig/mod.rs index b4cce517..f7ea03ac 100644 --- a/sdk/couchbase-core/src/cbconfig/mod.rs +++ b/sdk/couchbase-core/src/cbconfig/mod.rs @@ -5,132 +5,132 @@ use serde::Deserialize; #[derive(Deserialize, Debug)] pub struct VBucketServerMap { #[serde(alias = "hashAlgorithm")] - hash_algorithm: String, + pub hash_algorithm: String, #[serde(alias = "numReplicas")] - num_replicas: i8, + pub num_replicas: usize, #[serde(alias = "serverList")] - server_list: Vec, + pub server_list: Vec, #[serde(alias = "vBucketMap")] - vbucket_map: Vec>, + pub vbucket_map: Vec>, } #[derive(Deserialize, Debug)] pub struct ConfigDDocs { #[serde(alias = "uri")] - uri: String, + pub uri: String, } -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, Default)] pub struct TerseExtNodePorts { #[serde(alias = "kv")] - kv: Option, + pub kv: Option, #[serde(alias = "capi")] - capi: Option, + pub capi: Option, #[serde(alias = "mgmt")] - mgmt: Option, + pub mgmt: i64, #[serde(alias = "n1ql")] - n1ql: Option, + pub n1ql: Option, #[serde(alias = "fts")] - fts: Option, + pub fts: Option, #[serde(alias = "cbas")] - cbas: Option, + pub cbas: Option, #[serde(alias = "eventingAdminPort")] - eventing: Option, + pub eventing: Option, #[serde(alias = "indexHttp")] - gsi: Option, + pub gsi: Option, #[serde(alias = "backupAPI")] - backup: Option, + pub backup: Option, #[serde(alias = "kvSSL")] - kv_ssl: Option, + pub kv_ssl: Option, #[serde(alias = "capiSSL")] - capi_ssl: Option, + pub capi_ssl: Option, #[serde(alias = "mgmtSSL")] - mgmt_ssl: Option, + pub mgmt_ssl: i64, #[serde(alias = "n1qlSSL")] - n1ql_ssl: Option, + pub n1ql_ssl: Option, #[serde(alias = "ftsSSL")] - fts_ssl: Option, + pub fts_ssl: Option, #[serde(alias = "cbasSSL")] - cbas_ssl: Option, + pub cbas_ssl: Option, #[serde(alias = "eventingSSL")] - eventing_ssl: Option, + pub eventing_ssl: Option, #[serde(alias = "indexHttps")] - gsi_ssl: Option, + pub gsi_ssl: Option, #[serde(alias = "backupAPIHTTPS")] - backup_ssl: Option, + pub backup_ssl: Option, } #[derive(Deserialize, Debug)] pub struct TerseExtNodeAltAddresses { #[serde(alias = "ports")] - ports: Option, + pub ports: TerseExtNodePorts, #[serde(alias = "hostname")] - hostname: Option, + pub hostname: String, } #[derive(Deserialize, Debug)] pub struct TerseNodePorts { #[serde(alias = "direct")] - direct: Option, + pub direct: Option, #[serde(alias = "proxy")] - proxy: Option, + pub proxy: Option, } #[derive(Deserialize, Debug)] pub struct TerseNodeConfig { #[serde(alias = "couchbaseApiBase")] - couchbase_api_base: Option, + pub couchbase_api_base: Option, #[serde(alias = "hostname")] - hostname: Option, + pub hostname: Option, #[serde(alias = "ports")] - ports: Option, + pub ports: Option, } #[derive(Deserialize, Debug)] pub struct TerseNodeExtConfig { #[serde(alias = "services")] - services: Option, + pub services: Option, #[serde(alias = "thisNode")] - this_node: Option, + pub this_node: Option, #[serde(alias = "hostname")] - hostname: Option, - #[serde(alias = "alternateAddresses")] - alternate_addresses: Option, + pub hostname: String, + #[serde(alias = "alternateAddresses", default)] + pub alternate_addresses: HashMap, } #[derive(Deserialize, Debug)] pub struct TerseConfig { #[serde(alias = "rev")] - rev: i64, + pub rev: i64, #[serde(alias = "revEpoch")] - rev_epoch: Option, + pub rev_epoch: Option, #[serde(alias = "name")] - name: Option, + pub name: Option, #[serde(alias = "nodeLocator")] - node_locator: Option, + pub node_locator: String, #[serde(alias = "uuid")] - uuid: Option, + pub uuid: Option, #[serde(alias = "uri")] - uri: Option, + pub uri: Option, #[serde(alias = "streamingUri")] - streaming_uri: Option, + pub streaming_uri: Option, #[serde(alias = "bucketCapabilitiesVer")] - bucket_capabilities_ver: Option, + pub bucket_capabilities_ver: Option, #[serde(alias = "bucketCapabilities")] - bucket_capabilities: Vec, + pub bucket_capabilities: Vec, #[serde(alias = "collectionsManifestUid")] - collections_manifest_uuid: Option, + pub collections_manifest_uuid: Option, #[serde(alias = "ddocs")] - ddocs: Option, + pub ddocs: Option, #[serde(alias = "vBucketServerMap")] - vbucket_server_map: Option, + pub vbucket_server_map: Option, #[serde(alias = "nodes")] - nodes: Vec, - #[serde(alias = "nodes_ext")] - nodes_ext: Vec, + pub nodes: Vec, + #[serde(alias = "nodesExt")] + pub nodes_ext: Vec, #[serde(alias = "clusterCapabilitiesVer")] - cluster_capabilities_ver: Vec, + pub cluster_capabilities_ver: Vec, #[serde(alias = "clusterCapabilities")] - cluster_capabilities: HashMap>, + pub cluster_capabilities: HashMap>, } diff --git a/sdk/couchbase-core/src/configparser.rs b/sdk/couchbase-core/src/configparser.rs new file mode 100644 index 00000000..0f82a576 --- /dev/null +++ b/sdk/couchbase-core/src/configparser.rs @@ -0,0 +1,136 @@ +use std::collections::HashMap; + +use crate::cbconfig::{TerseConfig, TerseExtNodePorts, VBucketServerMap}; +use crate::parsedconfig::{ + BucketType, ParsedConfig, ParsedConfigBucket, ParsedConfigFeatures, ParsedConfigNode, + ParsedConfigNodeAddresses, ParsedConfigNodePorts, +}; +use crate::result::CoreResult; +use crate::vbucketmap::VbucketMap; + +pub(crate) struct ConfigParser {} + +impl ConfigParser { + pub fn parse_terse_config( + config: TerseConfig, + source_hostname: &str, + ) -> CoreResult { + let rev_id = config.rev; + let rev_epoch = config.rev_epoch.unwrap_or_default(); + + let len_nodes = config.nodes.len(); + let mut nodes = Vec::with_capacity(config.nodes_ext.len()); + for (node_idx, node) in config.nodes_ext.into_iter().enumerate() { + let node_hostname = Self::parse_config_hostname(&node.hostname, source_hostname); + + let mut alt_addresses = HashMap::new(); + for (network_type, alt_addrs) in node.alternate_addresses { + let alt_hostname = Self::parse_config_hostname(&alt_addrs.hostname, &node_hostname); + let this_address = Self::parse_config_hosts_into(&alt_hostname, alt_addrs.ports); + + alt_addresses.insert(network_type, this_address); + } + + let this_node = ParsedConfigNode { + has_data: node_idx < len_nodes, + addresses: Self::parse_config_hosts_into( + &node_hostname, + node.services.unwrap_or_default(), + ), + alt_addresses, + }; + + nodes.push(this_node); + } + + let bucket = if let Some(bucket_name) = config.name { + let bucket_uuid = config.uuid.unwrap_or_default(); + let (bucket_type, vbucket_map) = match config.node_locator.as_str() { + "vbucket" => ( + BucketType::Couchbase, + Self::parse_vbucket_server_map(config.vbucket_server_map)?, + ), + _ => (BucketType::Invalid, None), + }; + + Some(ParsedConfigBucket { + bucket_uuid, + bucket_name, + bucket_type, + vbucket_map, + }) + } else { + None + }; + + let features = if let Some(caps) = config.cluster_capabilities.get("fts") { + ParsedConfigFeatures { + fts_vector_search: caps.contains(&"vectorSearch".to_string()), + } + } else { + ParsedConfigFeatures { + fts_vector_search: false, + } + }; + + Ok(ParsedConfig { + rev_id, + rev_epoch, + bucket, + nodes, + features, + }) + } + + fn parse_config_hostname(hostname: &str, source_hostname: &str) -> String { + if hostname.is_empty() { + return source_hostname.to_string(); + } + + if hostname.contains(':') { + return format!("[{}]", hostname); + } + + hostname.to_string() + } + + fn parse_config_hosts_into( + hostname: &str, + ports: TerseExtNodePorts, + ) -> ParsedConfigNodeAddresses { + ParsedConfigNodeAddresses { + hostname: hostname.to_string(), + non_ssl_ports: ParsedConfigNodePorts { + kv: ports.kv, + mgmt: ports.mgmt, + query: ports.n1ql, + search: ports.fts, + analytics: ports.cbas, + }, + ssl_ports: ParsedConfigNodePorts { + kv: ports.kv_ssl, + mgmt: ports.mgmt_ssl, + query: ports.n1ql_ssl, + search: ports.fts_ssl, + analytics: ports.cbas_ssl, + }, + } + } + + fn parse_vbucket_server_map( + vbucket_server_map: Option, + ) -> CoreResult> { + if let Some(vbucket_server_map) = vbucket_server_map { + if vbucket_server_map.vbucket_map.is_empty() { + return Ok(None); + } + + return Ok(Some(VbucketMap::new( + vbucket_server_map.vbucket_map, + vbucket_server_map.num_replicas, + )?)); + } + + Ok(None) + } +} diff --git a/sdk/couchbase-core/src/configwatcher.rs b/sdk/couchbase-core/src/configwatcher.rs new file mode 100644 index 00000000..71a2377c --- /dev/null +++ b/sdk/couchbase-core/src/configwatcher.rs @@ -0,0 +1,300 @@ +use std::cmp::Ordering; +use std::future::Future; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use tokio::select; +use tokio::sync::broadcast; +use tokio::sync::broadcast::{Receiver, Sender}; +use tokio::time::sleep; + +use crate::cbconfig::TerseConfig; +use crate::configparser::ConfigParser; +use crate::kvclient::KvClient; +use crate::kvclient_ops::KvClientOps; +use crate::kvclientmanager::KvClientManager; +use crate::memdx::request::GetClusterConfigRequest; +use crate::parsedconfig::ParsedConfig; +use crate::result::CoreResult; + +pub(crate) trait ConfigWatcher { + fn watch(&self, on_shutdown_rx: Receiver<()>) -> Receiver; + fn reconfigure(&self, config: ConfigWatcherMemdConfig) -> impl Future>; +} + +pub(crate) struct ConfigWatcherMemdConfig { + pub endpoints: Vec, +} + +pub(crate) struct ConfigWatcherMemdOptions { + pub polling_period: Duration, + pub kv_client_manager: Arc, +} + +pub struct ConfigWatcherMemdInner { + pub polling_period: Duration, + pub kv_client_manager: Arc, + endpoints: Mutex>, +} + +impl ConfigWatcherMemdInner +where + M: KvClientManager, +{ + pub async fn reconfigure(&self, config: ConfigWatcherMemdConfig) -> CoreResult<()> { + let mut endpoints = self.endpoints.lock().unwrap(); + *endpoints = config.endpoints; + + Ok(()) + } + + pub async fn watch( + &self, + mut on_shutdown_rx: Receiver<()>, + on_new_config_tx: Sender, + ) { + let mut recent_endpoints = vec![]; + let mut all_endpoints_failed = true; + let mut last_sent_config = None; + + loop { + let mut endpoints = vec![]; + { + // Ensure the mutex isn't held across an await. + let endpoints_guard = self.endpoints.lock().unwrap(); + for endpoint in endpoints_guard.iter() { + endpoints.push(endpoint.clone()); + } + } + + if endpoints.is_empty() { + select! { + _ = on_shutdown_rx.recv() => { + return; + }, + _ = sleep(self.polling_period) => {} + } + } + + let mut remaining_endpoints = vec![]; + for endpoint in endpoints { + if !recent_endpoints.contains(&endpoint) { + remaining_endpoints.push(endpoint); + } + } + + let endpoint = if remaining_endpoints.is_empty() { + if all_endpoints_failed { + select! { + _ = on_shutdown_rx.recv() => { + return; + }, + _ = sleep(self.polling_period) => {} + } + } + + recent_endpoints = vec![]; + all_endpoints_failed = true; + continue; + } else { + remaining_endpoints.remove(0) + }; + + recent_endpoints.push(endpoint.clone()); + + let parsed_config = match self.poll_one(endpoint).await { + Ok(c) => c, + Err(e) => { + // TODO: log + dbg!(e); + continue; + } + }; + + all_endpoints_failed = false; + + if let Some(config) = &last_sent_config { + if let Some(cmp) = parsed_config.partial_cmp(config) { + if cmp == Ordering::Greater { + // TODO: log. + on_new_config_tx + .send(parsed_config.clone()) + .unwrap_or_default(); + last_sent_config = Some(parsed_config); + } + } + } else { + on_new_config_tx + .send(parsed_config.clone()) + .unwrap_or_default(); + last_sent_config = Some(parsed_config); + } + + select! { + _ = on_shutdown_rx.recv() => { + return; + }, + _ = sleep(self.polling_period) => {} + } + } + } + + async fn poll_one(&self, endpoint: String) -> CoreResult { + let client = self.kv_client_manager.get_client(endpoint).await?; + + let addr = client.remote_addr(); + let host = addr.ip(); + + let resp = client + .get_cluster_config(GetClusterConfigRequest {}) + .await?; + + let config: TerseConfig = serde_json::from_slice(resp.config.as_slice())?; + + ConfigParser::parse_terse_config(config, &host.to_string()) + } +} + +pub(crate) struct ConfigWatcherMemd { + inner: Arc>, +} + +impl ConfigWatcherMemd +where + M: KvClientManager + 'static, +{ + pub fn new(config: ConfigWatcherMemdConfig, opts: ConfigWatcherMemdOptions) -> Self { + Self { + inner: Arc::new(ConfigWatcherMemdInner { + polling_period: opts.polling_period, + kv_client_manager: opts.kv_client_manager, + endpoints: Mutex::new(config.endpoints), + }), + } + } +} + +impl ConfigWatcher for ConfigWatcherMemd +where + M: KvClientManager + 'static, +{ + fn watch(&self, on_shutdown_rx: Receiver<()>) -> Receiver { + let (on_new_config_tx, on_new_config_rx) = broadcast::channel::(1); + + let inner = self.inner.clone(); + tokio::spawn(async move { + inner.watch(on_shutdown_rx, on_new_config_tx).await; + }); + + on_new_config_rx + } + + async fn reconfigure(&self, config: ConfigWatcherMemdConfig) -> CoreResult<()> { + self.inner.reconfigure(config).await + } +} +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + use std::time::Duration; + + use tokio::sync::broadcast; + use tokio::sync::mpsc::unbounded_channel; + use tokio::time::sleep; + + use crate::authenticator::PasswordAuthenticator; + use crate::configwatcher::{ + ConfigWatcher, ConfigWatcherMemd, ConfigWatcherMemdConfig, ConfigWatcherMemdOptions, + }; + use crate::kvclient::{KvClientConfig, StdKvClient}; + use crate::kvclientmanager::{ + KvClientManager, KvClientManagerConfig, KvClientManagerOptions, StdKvClientManager, + }; + use crate::kvclientpool::NaiveKvClientPool; + use crate::memdx::client::Client; + use crate::memdx::packet::ResponsePacket; + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn fetches_configs() { + let client_config = KvClientConfig { + address: "192.168.107.128:11210" + .parse() + .expect("Failed to parse address"), + root_certs: None, + accept_all_certs: None, + client_name: "myclient".to_string(), + authenticator: Some(Arc::new(PasswordAuthenticator { + username: "Administrator".to_string(), + password: "password".to_string(), + })), + selected_bucket: Some("default".to_string()), + disable_default_features: false, + disable_error_map: false, + disable_bootstrap: false, + }; + + let mut client_configs = HashMap::new(); + client_configs.insert("192.168.107.128:11210".to_string(), client_config); + + let manger_config = KvClientManagerConfig { + num_pool_connections: 1, + clients: client_configs, + }; + + let (orphan_tx, mut orphan_rx) = unbounded_channel::(); + + tokio::spawn(async move { + loop { + match orphan_rx.recv().await { + Some(resp) => { + dbg!("unexpected orphan", resp); + } + None => { + return; + } + } + } + }); + + let manager: StdKvClientManager>> = + StdKvClientManager::new( + manger_config, + KvClientManagerOptions { + connect_timeout: Default::default(), + connect_throttle_period: Default::default(), + orphan_handler: Arc::new(orphan_tx), + }, + ) + .await + .unwrap(); + + let config = ConfigWatcherMemdConfig { + endpoints: vec!["192.168.107.128:11210".to_string()], + }; + let opts = ConfigWatcherMemdOptions { + polling_period: Duration::from_secs(1), + kv_client_manager: Arc::new(manager), + }; + let watcher = ConfigWatcherMemd::new(config, opts); + + let (shutdown_tx, shutdown_rx) = broadcast::channel(1); + let mut receiver = watcher.watch(shutdown_rx); + + tokio::spawn(async move { + sleep(Duration::from_secs(5)).await; + shutdown_tx.send(()).unwrap(); + }); + + loop { + let config = match receiver.recv().await { + Ok(c) => c, + Err(e) => { + dbg!(e); + return; + } + }; + dbg!(config); + } + } +} diff --git a/sdk/couchbase-core/src/error.rs b/sdk/couchbase-core/src/error.rs index c6c85323..6272d1b9 100644 --- a/sdk/couchbase-core/src/error.rs +++ b/sdk/couchbase-core/src/error.rs @@ -1,6 +1,6 @@ use std::fmt::Display; -use crate::error::CoreError::{Dispatch, PlaceholderMemdxWrapper}; +use crate::error::CoreError::{Dispatch, Placeholder, PlaceholderMemdxWrapper}; use crate::memdx::error::MemdxError; #[derive(thiserror::Error, Debug, Eq, PartialEq)] @@ -21,3 +21,9 @@ impl From for CoreError { } } } + +impl From for CoreError { + fn from(value: serde_json::Error) -> Self { + Placeholder(value.to_string()) + } +} diff --git a/sdk/couchbase-core/src/kvclient.rs b/sdk/couchbase-core/src/kvclient.rs index 8cb82fc1..870f672e 100644 --- a/sdk/couchbase-core/src/kvclient.rs +++ b/sdk/couchbase-core/src/kvclient.rs @@ -369,6 +369,7 @@ mod tests { use crate::authenticator::PasswordAuthenticator; use crate::kvclient::{KvClient, KvClientConfig, KvClientOptions, StdKvClient}; + use crate::kvclient_ops::KvClientOps; use crate::memdx::client::Client; use crate::memdx::packet::ResponsePacket; use crate::memdx::request::{GetRequest, SetRequest}; diff --git a/sdk/couchbase-core/src/kvclient_ops.rs b/sdk/couchbase-core/src/kvclient_ops.rs index e1fb8fe7..94e1c872 100644 --- a/sdk/couchbase-core/src/kvclient_ops.rs +++ b/sdk/couchbase-core/src/kvclient_ops.rs @@ -1,3 +1,5 @@ +use std::future::Future; + use crate::error::CoreError; use crate::kvclient::{KvClient, StdKvClient}; use crate::memdx::dispatcher::Dispatcher; @@ -6,10 +8,50 @@ use crate::memdx::op_bootstrap::{BootstrapOptions, OpBootstrap, OpBootstrapEncod use crate::memdx::ops_core::OpsCore; use crate::memdx::ops_crud::OpsCrud; use crate::memdx::pendingop::PendingOp; -use crate::memdx::request::{GetRequest, SelectBucketRequest, SetRequest}; -use crate::memdx::response::{BootstrapResult, GetResponse, SelectBucketResponse, SetResponse}; +use crate::memdx::request::{GetClusterConfigRequest, GetRequest, SelectBucketRequest, SetRequest}; +use crate::memdx::response::{ + BootstrapResult, GetClusterConfigResponse, GetResponse, SelectBucketResponse, SetResponse, +}; use crate::result::CoreResult; +pub(crate) trait KvClientOps: Sized + Send + Sync { + fn set(&self, req: SetRequest) -> impl Future> + Send; + fn get(&self, req: GetRequest) -> impl Future> + Send; + fn get_cluster_config( + &self, + req: GetClusterConfigRequest, + ) -> impl Future> + Send; +} + +impl KvClientOps for StdKvClient +where + D: Dispatcher, +{ + async fn set(&self, req: SetRequest) -> CoreResult { + let mut op = self.ops_crud().set(self.client(), req).await?; + + let res = op.recv().await?; + Ok(res) + } + + async fn get(&self, req: GetRequest) -> CoreResult { + let mut op = self.ops_crud().get(self.client(), req).await?; + + let res = op.recv().await?; + Ok(res) + } + + async fn get_cluster_config( + &self, + req: GetClusterConfigRequest, + ) -> CoreResult { + let mut op = OpsCore {}.get_cluster_config(self.client(), req).await?; + + let res = op.recv().await?; + Ok(res) + } +} + impl StdKvClient where D: Dispatcher, @@ -33,20 +75,6 @@ where Ok(res) } - pub async fn get(&self, req: GetRequest) -> CoreResult { - let mut op = self.ops_crud().get(self.client(), req).await?; - - let res = op.recv().await?; - Ok(res) - } - - pub async fn set(&self, req: SetRequest) -> CoreResult { - let mut op = self.ops_crud().set(self.client(), req).await?; - - let res = op.recv().await?; - Ok(res) - } - fn ops_crud(&self) -> OpsCrud { OpsCrud { collections_enabled: self.has_feature(HelloFeature::Collections), diff --git a/sdk/couchbase-core/src/kvclientmanager.rs b/sdk/couchbase-core/src/kvclientmanager.rs index 264f7968..5197df1e 100644 --- a/sdk/couchbase-core/src/kvclientmanager.rs +++ b/sdk/couchbase-core/src/kvclientmanager.rs @@ -279,6 +279,7 @@ mod tests { use crate::authenticator::PasswordAuthenticator; use crate::kvclient::{KvClient, KvClientConfig, StdKvClient}; + use crate::kvclient_ops::KvClientOps; use crate::kvclientmanager::{ KvClientManager, KvClientManagerConfig, KvClientManagerOptions, StdKvClientManager, }; diff --git a/sdk/couchbase-core/src/kvclientpool.rs b/sdk/couchbase-core/src/kvclientpool.rs index a9d2a7b6..7b9b502d 100644 --- a/sdk/couchbase-core/src/kvclientpool.rs +++ b/sdk/couchbase-core/src/kvclientpool.rs @@ -10,12 +10,13 @@ use tokio::time::{Instant, sleep}; use crate::error::CoreError; use crate::kvclient::{KvClient, KvClientConfig, KvClientOptions}; +use crate::kvclient_ops::KvClientOps; use crate::memdx::dispatcher::Dispatcher; use crate::memdx::packet::ResponsePacket; use crate::result::CoreResult; pub(crate) trait KvClientPool: Sized + Send + Sync { - type Client: KvClient + Send + Sync; + type Client: KvClient + KvClientOps + Send + Sync; fn new( config: KvClientPoolConfig, @@ -263,7 +264,7 @@ where impl KvClientPool for NaiveKvClientPool where - K: KvClient + PartialEq + Sync + Send + 'static, + K: KvClient + KvClientOps + PartialEq + Sync + Send + 'static, { type Client = K; @@ -323,6 +324,7 @@ mod tests { use crate::authenticator::PasswordAuthenticator; use crate::kvclient::{KvClient, KvClientConfig, StdKvClient}; + use crate::kvclient_ops::KvClientOps; use crate::kvclientpool::{ KvClientPool, KvClientPoolConfig, KvClientPoolOptions, NaiveKvClientPool, }; diff --git a/sdk/couchbase-core/src/lib.rs b/sdk/couchbase-core/src/lib.rs index 17ca4bc4..97ad3b07 100644 --- a/sdk/couchbase-core/src/lib.rs +++ b/sdk/couchbase-core/src/lib.rs @@ -1,11 +1,15 @@ pub mod authenticator; pub mod cbconfig; +mod configparser; +mod configwatcher; mod error; mod kvclient; mod kvclient_ops; mod kvclientmanager; mod kvclientpool; pub mod memdx; +mod parsedconfig; pub mod result; mod scram; pub mod service_type; +mod vbucketmap; diff --git a/sdk/couchbase-core/src/parsedconfig.rs b/sdk/couchbase-core/src/parsedconfig.rs new file mode 100644 index 00000000..38977f2e --- /dev/null +++ b/sdk/couchbase-core/src/parsedconfig.rs @@ -0,0 +1,145 @@ +use std::cmp::Ordering; +use std::collections::HashMap; + +use crate::vbucketmap::VbucketMap; + +#[derive(Debug, Clone, Eq, PartialEq)] +#[non_exhaustive] +pub enum BucketType { + Invalid, + Couchbase, +} + +#[derive(Debug, Clone, Eq, PartialEq, Default)] +pub(crate) struct ParsedConfigNodePorts { + pub kv: Option, + pub mgmt: i64, + pub query: Option, + pub search: Option, + pub analytics: Option, + // TODO: Do we need views? +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub(crate) struct ParsedConfigNodeAddresses { + pub hostname: String, + pub non_ssl_ports: ParsedConfigNodePorts, + pub ssl_ports: ParsedConfigNodePorts, +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub(crate) struct ParsedConfigNode { + pub has_data: bool, + pub addresses: ParsedConfigNodeAddresses, + pub alt_addresses: HashMap, +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub(crate) struct ParsedConfigFeatures { + pub fts_vector_search: bool, +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub(crate) struct NetworkConfigNode { + pub node_id: String, + pub hostname: String, + pub has_data: bool, + pub non_ssl_ports: ParsedConfigNodePorts, + pub ssl_ports: ParsedConfigNodePorts, +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub(crate) struct NetworkConfig { + pub nodes: Vec, +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub(crate) struct ParsedConfigBucket { + pub bucket_uuid: String, + pub bucket_name: String, + pub bucket_type: BucketType, + pub vbucket_map: Option, +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub(crate) struct ParsedConfig { + pub rev_id: i64, + pub rev_epoch: i64, + + pub bucket: Option, + + pub nodes: Vec, + + pub features: ParsedConfigFeatures, +} + +impl ParsedConfig { + pub fn is_versioned(&self) -> bool { + self.rev_epoch > 0 && self.rev_id > 0 + } + + pub fn addresses_group_for_network_type(&self, network_type: &str) -> NetworkConfig { + let mut nodes = Vec::with_capacity(self.nodes.len()); + for node in &self.nodes { + let node_id = format!( + "ep-{}-{}", + node.addresses.hostname, node.addresses.non_ssl_ports.mgmt + ); + + let node_info = if network_type == "default" { + NetworkConfigNode { + node_id, + hostname: node.addresses.hostname.clone(), + has_data: node.has_data, + non_ssl_ports: node.addresses.non_ssl_ports.clone(), + ssl_ports: node.addresses.ssl_ports.clone(), + } + } else if let Some(alt_info) = node.alt_addresses.get(network_type) { + NetworkConfigNode { + node_id, + hostname: alt_info.hostname.clone(), + has_data: node.has_data, + non_ssl_ports: alt_info.non_ssl_ports.clone(), + ssl_ports: alt_info.ssl_ports.clone(), + } + } else { + NetworkConfigNode { + node_id, + hostname: "".to_string(), + has_data: node.has_data, + non_ssl_ports: ParsedConfigNodePorts::default(), + ssl_ports: ParsedConfigNodePorts::default(), + } + }; + + nodes.push(node_info); + } + + NetworkConfig { nodes } + } +} + +impl PartialOrd for ParsedConfig { + fn partial_cmp(&self, other: &ParsedConfig) -> Option { + match self.rev_epoch.cmp(&other.rev_epoch) { + Ordering::Less => { + return Some(Ordering::Less); + } + Ordering::Greater => { + return Some(Ordering::Greater); + } + Ordering::Equal => {} + } + match self.rev_id.cmp(&other.rev_id) { + Ordering::Less => { + return Some(Ordering::Less); + } + Ordering::Greater => { + return Some(Ordering::Greater); + } + Ordering::Equal => {} + } + + Some(Ordering::Equal) + } +} diff --git a/sdk/couchbase-core/src/vbucketmap.rs b/sdk/couchbase-core/src/vbucketmap.rs new file mode 100644 index 00000000..17d69634 --- /dev/null +++ b/sdk/couchbase-core/src/vbucketmap.rs @@ -0,0 +1,139 @@ +use crate::error::CoreError; +use crate::result::CoreResult; + +#[derive(Debug, Clone, Eq, PartialEq)] +pub(crate) struct VbucketMap { + entries: Vec>, + num_replicas: usize, +} + +impl VbucketMap { + pub fn new(entries: Vec>, num_replicas: usize) -> CoreResult { + if entries.is_empty() { + return Err(CoreError::Placeholder( + "vbucket map must have at least a single entry".to_string(), + )); + } + + Ok(Self { + entries, + num_replicas, + }) + } + + pub fn is_valid(&self) -> bool { + if let Some(entry) = self.entries.first() { + return !entry.is_empty(); + } + + false + } + + pub fn num_vbuckets(&self) -> usize { + self.entries.len() + } + + pub fn num_replicas(&self) -> usize { + self.num_replicas + } + + pub fn vbucket_by_key(&self, key: Vec) -> u16 { + let checksum = crc32fast::hash(key.as_slice()); + let mid_bits = (checksum >> 16) as u16 & 0x7fff; + mid_bits % (self.entries.len() as u16) + } + + pub fn node_by_vbucket(&self, vb_id: u16, vb_server_idx: u32) -> CoreResult { + let num_servers = (self.num_replicas as u32) + 1; + if vb_server_idx > num_servers { + return Err(CoreError::Placeholder("invalid replica".to_string())); + } + + if let Some(idx) = self.entries.get(vb_id as usize) { + if let Some(id) = idx.get(vb_server_idx as usize) { + Ok(*id) + } else { + Ok(-1) + } + } else { + Err(CoreError::Placeholder("invalid vbucket".to_string())) + } + } +} + +#[cfg(test)] +mod tests { + use crate::vbucketmap::VbucketMap; + + #[test] + fn vbucketmap_with_1024_vbs() { + let vb_map = VbucketMap::new(vec![vec![]; 1024], 1).unwrap(); + + assert_eq!(0x0202u16, vb_map.vbucket_by_key(vec![0])); + assert_eq!( + 0x00aau16, + vb_map.vbucket_by_key(vec![0, 1, 2, 3, 4, 5, 6, 7]) + ); + assert_eq!(0x0210u16, vb_map.vbucket_by_key(b"hello".to_vec())); + assert_eq!( + 0x03d4u16, + vb_map.vbucket_by_key( + b"hello world, I am a super long key lets see if it works".to_vec() + ) + ); + } + + #[test] + fn vbucketmap_with_64_vbs() { + let vb_map = VbucketMap::new(vec![vec![]; 64], 1).unwrap(); + + assert_eq!(0x0002u16, vb_map.vbucket_by_key(vec![0])); + assert_eq!( + 0x002au16, + vb_map.vbucket_by_key(vec![0, 1, 2, 3, 4, 5, 6, 7]) + ); + assert_eq!(0x0010u16, vb_map.vbucket_by_key(b"hello".to_vec())); + assert_eq!( + 0x0014u16, + vb_map.vbucket_by_key( + b"hello world, I am a super long key lets see if it works".to_vec() + ) + ); + } + + #[test] + fn vbucketmap_with_48_vbs() { + let vb_map = VbucketMap::new(vec![vec![]; 48], 1).unwrap(); + + assert_eq!(0x0012u16, vb_map.vbucket_by_key(vec![0])); + assert_eq!( + 0x000au16, + vb_map.vbucket_by_key(vec![0, 1, 2, 3, 4, 5, 6, 7]) + ); + assert_eq!(0x0010u16, vb_map.vbucket_by_key(b"hello".to_vec())); + assert_eq!( + 0x0004u16, + vb_map.vbucket_by_key( + b"hello world, I am a super long key lets see if it works".to_vec() + ) + ); + } + + #[test] + fn vbucketmap_with_13_vbs() { + let vb_map = VbucketMap::new(vec![vec![]; 13], 1).unwrap(); + + assert_eq!(0x000cu16, vb_map.vbucket_by_key(vec![0])); + assert_eq!( + 0x0008u16, + vb_map.vbucket_by_key(vec![0, 1, 2, 3, 4, 5, 6, 7]) + ); + assert_eq!(0x0008u16, vb_map.vbucket_by_key(b"hello".to_vec())); + assert_eq!( + 0x0003u16, + vb_map.vbucket_by_key( + b"hello world, I am a super long key lets see if it works".to_vec() + ) + ); + } +}