Skip to content

Implement config watcher memd #161

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/couchbase-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ base64 = "0.22.1"
webpki = "0.22.4"
async-trait = "0.1.80"
tokio-io = { version = "0.2.0-alpha.6", features = ["util"] }
crc32fast = "1.4.2"
serde_json = "1.0.120"

[dev-dependencies]
env_logger = "0.11"
Expand Down
106 changes: 53 additions & 53 deletions sdk/couchbase-core/src/cbconfig/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,132 +5,132 @@ use serde::Deserialize;
#[derive(Deserialize, Debug)]
pub struct VBucketServerMap {
#[serde(alias = "hashAlgorithm")]
hash_algorithm: String,
pub hash_algorithm: String,
#[serde(alias = "numReplicas")]
num_replicas: i8,
pub num_replicas: usize,
#[serde(alias = "serverList")]
server_list: Vec<String>,
pub server_list: Vec<String>,
#[serde(alias = "vBucketMap")]
vbucket_map: Vec<Vec<i8>>,
pub vbucket_map: Vec<Vec<i16>>,
}

#[derive(Deserialize, Debug)]
pub struct ConfigDDocs {
#[serde(alias = "uri")]
uri: String,
pub uri: String,
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Default)]
pub struct TerseExtNodePorts {
#[serde(alias = "kv")]
kv: Option<i16>,
pub kv: Option<i64>,
#[serde(alias = "capi")]
capi: Option<i16>,
pub capi: Option<i64>,
#[serde(alias = "mgmt")]
mgmt: Option<i16>,
pub mgmt: i64,
#[serde(alias = "n1ql")]
n1ql: Option<i16>,
pub n1ql: Option<i64>,
#[serde(alias = "fts")]
fts: Option<i16>,
pub fts: Option<i64>,
#[serde(alias = "cbas")]
cbas: Option<i16>,
pub cbas: Option<i64>,
#[serde(alias = "eventingAdminPort")]
eventing: Option<i16>,
pub eventing: Option<i64>,
#[serde(alias = "indexHttp")]
gsi: Option<i16>,
pub gsi: Option<i64>,
#[serde(alias = "backupAPI")]
backup: Option<i16>,
pub backup: Option<i64>,

#[serde(alias = "kvSSL")]
kv_ssl: Option<i16>,
pub kv_ssl: Option<i64>,
#[serde(alias = "capiSSL")]
capi_ssl: Option<i16>,
pub capi_ssl: Option<i64>,
#[serde(alias = "mgmtSSL")]
mgmt_ssl: Option<i16>,
pub mgmt_ssl: i64,
#[serde(alias = "n1qlSSL")]
n1ql_ssl: Option<i16>,
pub n1ql_ssl: Option<i64>,
#[serde(alias = "ftsSSL")]
fts_ssl: Option<i16>,
pub fts_ssl: Option<i64>,
#[serde(alias = "cbasSSL")]
cbas_ssl: Option<i16>,
pub cbas_ssl: Option<i64>,
#[serde(alias = "eventingSSL")]
eventing_ssl: Option<i16>,
pub eventing_ssl: Option<i64>,
#[serde(alias = "indexHttps")]
gsi_ssl: Option<i16>,
pub gsi_ssl: Option<i64>,
#[serde(alias = "backupAPIHTTPS")]
backup_ssl: Option<i16>,
pub backup_ssl: Option<i64>,
}

#[derive(Deserialize, Debug)]
pub struct TerseExtNodeAltAddresses {
#[serde(alias = "ports")]
ports: Option<TerseExtNodePorts>,
pub ports: TerseExtNodePorts,
#[serde(alias = "hostname")]
hostname: Option<String>,
pub hostname: String,
}

#[derive(Deserialize, Debug)]
pub struct TerseNodePorts {
#[serde(alias = "direct")]
direct: Option<u16>,
pub direct: Option<u16>,
#[serde(alias = "proxy")]
proxy: Option<u16>,
pub proxy: Option<u16>,
}

#[derive(Deserialize, Debug)]
pub struct TerseNodeConfig {
#[serde(alias = "couchbaseApiBase")]
couchbase_api_base: Option<String>,
pub couchbase_api_base: Option<String>,
#[serde(alias = "hostname")]
hostname: Option<String>,
pub hostname: Option<String>,
#[serde(alias = "ports")]
ports: Option<TerseNodePorts>,
pub ports: Option<TerseNodePorts>,
}

#[derive(Deserialize, Debug)]
pub struct TerseNodeExtConfig {
#[serde(alias = "services")]
services: Option<TerseExtNodePorts>,
pub services: Option<TerseExtNodePorts>,
#[serde(alias = "thisNode")]
this_node: Option<bool>,
pub this_node: Option<bool>,
#[serde(alias = "hostname")]
hostname: Option<String>,
#[serde(alias = "alternateAddresses")]
alternate_addresses: Option<TerseExtNodeAltAddresses>,
pub hostname: String,
#[serde(alias = "alternateAddresses", default)]
pub alternate_addresses: HashMap<String, TerseExtNodeAltAddresses>,
}

#[derive(Deserialize, Debug)]
pub struct TerseConfig {
#[serde(alias = "rev")]
rev: i64,
pub rev: i64,
#[serde(alias = "revEpoch")]
rev_epoch: Option<i64>,
pub rev_epoch: Option<i64>,
#[serde(alias = "name")]
name: Option<String>,
pub name: Option<String>,
#[serde(alias = "nodeLocator")]
node_locator: Option<String>,
pub node_locator: String,
#[serde(alias = "uuid")]
uuid: Option<String>,
pub uuid: Option<String>,
#[serde(alias = "uri")]
uri: Option<String>,
pub uri: Option<String>,
#[serde(alias = "streamingUri")]
streaming_uri: Option<String>,
pub streaming_uri: Option<String>,
#[serde(alias = "bucketCapabilitiesVer")]
bucket_capabilities_ver: Option<String>,
pub bucket_capabilities_ver: Option<String>,
#[serde(alias = "bucketCapabilities")]
bucket_capabilities: Vec<String>,
pub bucket_capabilities: Vec<String>,
#[serde(alias = "collectionsManifestUid")]
collections_manifest_uuid: Option<String>,
pub collections_manifest_uuid: Option<String>,
#[serde(alias = "ddocs")]
ddocs: Option<ConfigDDocs>,
pub ddocs: Option<ConfigDDocs>,
#[serde(alias = "vBucketServerMap")]
vbucket_server_map: Option<VBucketServerMap>,
pub vbucket_server_map: Option<VBucketServerMap>,
#[serde(alias = "nodes")]
nodes: Vec<TerseNodeConfig>,
#[serde(alias = "nodes_ext")]
nodes_ext: Vec<TerseNodeExtConfig>,
pub nodes: Vec<TerseNodeConfig>,
#[serde(alias = "nodesExt")]
pub nodes_ext: Vec<TerseNodeExtConfig>,
#[serde(alias = "clusterCapabilitiesVer")]
cluster_capabilities_ver: Vec<i64>,
pub cluster_capabilities_ver: Vec<i64>,
#[serde(alias = "clusterCapabilities")]
cluster_capabilities: HashMap<String, Vec<String>>,
pub cluster_capabilities: HashMap<String, Vec<String>>,
}
136 changes: 136 additions & 0 deletions sdk/couchbase-core/src/configparser.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
use std::collections::HashMap;

use crate::cbconfig::{TerseConfig, TerseExtNodePorts, VBucketServerMap};
use crate::parsedconfig::{
BucketType, ParsedConfig, ParsedConfigBucket, ParsedConfigFeatures, ParsedConfigNode,
ParsedConfigNodeAddresses, ParsedConfigNodePorts,
};
use crate::result::CoreResult;
use crate::vbucketmap::VbucketMap;

pub(crate) struct ConfigParser {}

impl ConfigParser {
pub fn parse_terse_config(
config: TerseConfig,
source_hostname: &str,
) -> CoreResult<ParsedConfig> {
let rev_id = config.rev;
let rev_epoch = config.rev_epoch.unwrap_or_default();

let len_nodes = config.nodes.len();
let mut nodes = Vec::with_capacity(config.nodes_ext.len());
for (node_idx, node) in config.nodes_ext.into_iter().enumerate() {
let node_hostname = Self::parse_config_hostname(&node.hostname, source_hostname);

let mut alt_addresses = HashMap::new();
for (network_type, alt_addrs) in node.alternate_addresses {
let alt_hostname = Self::parse_config_hostname(&alt_addrs.hostname, &node_hostname);
let this_address = Self::parse_config_hosts_into(&alt_hostname, alt_addrs.ports);

alt_addresses.insert(network_type, this_address);
}

let this_node = ParsedConfigNode {
has_data: node_idx < len_nodes,
addresses: Self::parse_config_hosts_into(
&node_hostname,
node.services.unwrap_or_default(),
),
alt_addresses,
};

nodes.push(this_node);
}

let bucket = if let Some(bucket_name) = config.name {
let bucket_uuid = config.uuid.unwrap_or_default();
let (bucket_type, vbucket_map) = match config.node_locator.as_str() {
"vbucket" => (
BucketType::Couchbase,
Self::parse_vbucket_server_map(config.vbucket_server_map)?,
),
_ => (BucketType::Invalid, None),
};

Some(ParsedConfigBucket {
bucket_uuid,
bucket_name,
bucket_type,
vbucket_map,
})
} else {
None
};

let features = if let Some(caps) = config.cluster_capabilities.get("fts") {
ParsedConfigFeatures {
fts_vector_search: caps.contains(&"vectorSearch".to_string()),
}
} else {
ParsedConfigFeatures {
fts_vector_search: false,
}
};

Ok(ParsedConfig {
rev_id,
rev_epoch,
bucket,
nodes,
features,
})
}

fn parse_config_hostname(hostname: &str, source_hostname: &str) -> String {
if hostname.is_empty() {
return source_hostname.to_string();
}

if hostname.contains(':') {
return format!("[{}]", hostname);
}

hostname.to_string()
}

fn parse_config_hosts_into(
hostname: &str,
ports: TerseExtNodePorts,
) -> ParsedConfigNodeAddresses {
ParsedConfigNodeAddresses {
hostname: hostname.to_string(),
non_ssl_ports: ParsedConfigNodePorts {
kv: ports.kv,
mgmt: ports.mgmt,
query: ports.n1ql,
search: ports.fts,
analytics: ports.cbas,
},
ssl_ports: ParsedConfigNodePorts {
kv: ports.kv_ssl,
mgmt: ports.mgmt_ssl,
query: ports.n1ql_ssl,
search: ports.fts_ssl,
analytics: ports.cbas_ssl,
},
}
}

fn parse_vbucket_server_map(
vbucket_server_map: Option<VBucketServerMap>,
) -> CoreResult<Option<VbucketMap>> {
if let Some(vbucket_server_map) = vbucket_server_map {
if vbucket_server_map.vbucket_map.is_empty() {
return Ok(None);
}

return Ok(Some(VbucketMap::new(
vbucket_server_map.vbucket_map,
vbucket_server_map.num_replicas,
)?));
}

Ok(None)
}
}
Loading
Loading