Skip to content

Commit 4edc02d

Browse files
committed
Implement config watcher memd
1 parent 9fb69d8 commit 4edc02d

File tree

12 files changed

+836
-72
lines changed

12 files changed

+836
-72
lines changed

sdk/couchbase-core/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ base64 = "0.22.1"
2323
webpki = "0.22.4"
2424
async-trait = "0.1.80"
2525
tokio-io = { version = "0.2.0-alpha.6", features = ["util"] }
26+
crc32fast = "1.4.2"
27+
serde_json = "1.0.120"
2628

2729
[dev-dependencies]
2830
env_logger = "0.11"

sdk/couchbase-core/src/cbconfig/mod.rs

Lines changed: 53 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -5,132 +5,132 @@ use serde::Deserialize;
55
#[derive(Deserialize, Debug)]
66
pub struct VBucketServerMap {
77
#[serde(alias = "hashAlgorithm")]
8-
hash_algorithm: String,
8+
pub hash_algorithm: String,
99
#[serde(alias = "numReplicas")]
10-
num_replicas: i8,
10+
pub num_replicas: usize,
1111
#[serde(alias = "serverList")]
12-
server_list: Vec<String>,
12+
pub server_list: Vec<String>,
1313
#[serde(alias = "vBucketMap")]
14-
vbucket_map: Vec<Vec<i8>>,
14+
pub vbucket_map: Vec<Vec<i16>>,
1515
}
1616

1717
#[derive(Deserialize, Debug)]
1818
pub struct ConfigDDocs {
1919
#[serde(alias = "uri")]
20-
uri: String,
20+
pub uri: String,
2121
}
2222

23-
#[derive(Deserialize, Debug)]
23+
#[derive(Deserialize, Debug, Default)]
2424
pub struct TerseExtNodePorts {
2525
#[serde(alias = "kv")]
26-
kv: Option<i16>,
26+
pub kv: Option<i64>,
2727
#[serde(alias = "capi")]
28-
capi: Option<i16>,
28+
pub capi: Option<i64>,
2929
#[serde(alias = "mgmt")]
30-
mgmt: Option<i16>,
30+
pub mgmt: i64,
3131
#[serde(alias = "n1ql")]
32-
n1ql: Option<i16>,
32+
pub n1ql: Option<i64>,
3333
#[serde(alias = "fts")]
34-
fts: Option<i16>,
34+
pub fts: Option<i64>,
3535
#[serde(alias = "cbas")]
36-
cbas: Option<i16>,
36+
pub cbas: Option<i64>,
3737
#[serde(alias = "eventingAdminPort")]
38-
eventing: Option<i16>,
38+
pub eventing: Option<i64>,
3939
#[serde(alias = "indexHttp")]
40-
gsi: Option<i16>,
40+
pub gsi: Option<i64>,
4141
#[serde(alias = "backupAPI")]
42-
backup: Option<i16>,
42+
pub backup: Option<i64>,
4343

4444
#[serde(alias = "kvSSL")]
45-
kv_ssl: Option<i16>,
45+
pub kv_ssl: Option<i64>,
4646
#[serde(alias = "capiSSL")]
47-
capi_ssl: Option<i16>,
47+
pub capi_ssl: Option<i64>,
4848
#[serde(alias = "mgmtSSL")]
49-
mgmt_ssl: Option<i16>,
49+
pub mgmt_ssl: i64,
5050
#[serde(alias = "n1qlSSL")]
51-
n1ql_ssl: Option<i16>,
51+
pub n1ql_ssl: Option<i64>,
5252
#[serde(alias = "ftsSSL")]
53-
fts_ssl: Option<i16>,
53+
pub fts_ssl: Option<i64>,
5454
#[serde(alias = "cbasSSL")]
55-
cbas_ssl: Option<i16>,
55+
pub cbas_ssl: Option<i64>,
5656
#[serde(alias = "eventingSSL")]
57-
eventing_ssl: Option<i16>,
57+
pub eventing_ssl: Option<i64>,
5858
#[serde(alias = "indexHttps")]
59-
gsi_ssl: Option<i16>,
59+
pub gsi_ssl: Option<i64>,
6060
#[serde(alias = "backupAPIHTTPS")]
61-
backup_ssl: Option<i16>,
61+
pub backup_ssl: Option<i64>,
6262
}
6363

6464
#[derive(Deserialize, Debug)]
6565
pub struct TerseExtNodeAltAddresses {
6666
#[serde(alias = "ports")]
67-
ports: Option<TerseExtNodePorts>,
67+
pub ports: TerseExtNodePorts,
6868
#[serde(alias = "hostname")]
69-
hostname: Option<String>,
69+
pub hostname: String,
7070
}
7171

7272
#[derive(Deserialize, Debug)]
7373
pub struct TerseNodePorts {
7474
#[serde(alias = "direct")]
75-
direct: Option<u16>,
75+
pub direct: Option<u16>,
7676
#[serde(alias = "proxy")]
77-
proxy: Option<u16>,
77+
pub proxy: Option<u16>,
7878
}
7979

8080
#[derive(Deserialize, Debug)]
8181
pub struct TerseNodeConfig {
8282
#[serde(alias = "couchbaseApiBase")]
83-
couchbase_api_base: Option<String>,
83+
pub couchbase_api_base: Option<String>,
8484
#[serde(alias = "hostname")]
85-
hostname: Option<String>,
85+
pub hostname: Option<String>,
8686
#[serde(alias = "ports")]
87-
ports: Option<TerseNodePorts>,
87+
pub ports: Option<TerseNodePorts>,
8888
}
8989

9090
#[derive(Deserialize, Debug)]
9191
pub struct TerseNodeExtConfig {
9292
#[serde(alias = "services")]
93-
services: Option<TerseExtNodePorts>,
93+
pub services: Option<TerseExtNodePorts>,
9494
#[serde(alias = "thisNode")]
95-
this_node: Option<bool>,
95+
pub this_node: Option<bool>,
9696
#[serde(alias = "hostname")]
97-
hostname: Option<String>,
98-
#[serde(alias = "alternateAddresses")]
99-
alternate_addresses: Option<TerseExtNodeAltAddresses>,
97+
pub hostname: String,
98+
#[serde(alias = "alternateAddresses", default)]
99+
pub alternate_addresses: HashMap<String, TerseExtNodeAltAddresses>,
100100
}
101101

102102
#[derive(Deserialize, Debug)]
103103
pub struct TerseConfig {
104104
#[serde(alias = "rev")]
105-
rev: i64,
105+
pub rev: i64,
106106
#[serde(alias = "revEpoch")]
107-
rev_epoch: Option<i64>,
107+
pub rev_epoch: Option<i64>,
108108
#[serde(alias = "name")]
109-
name: Option<String>,
109+
pub name: Option<String>,
110110
#[serde(alias = "nodeLocator")]
111-
node_locator: Option<String>,
111+
pub node_locator: String,
112112
#[serde(alias = "uuid")]
113-
uuid: Option<String>,
113+
pub uuid: Option<String>,
114114
#[serde(alias = "uri")]
115-
uri: Option<String>,
115+
pub uri: Option<String>,
116116
#[serde(alias = "streamingUri")]
117-
streaming_uri: Option<String>,
117+
pub streaming_uri: Option<String>,
118118
#[serde(alias = "bucketCapabilitiesVer")]
119-
bucket_capabilities_ver: Option<String>,
119+
pub bucket_capabilities_ver: Option<String>,
120120
#[serde(alias = "bucketCapabilities")]
121-
bucket_capabilities: Vec<String>,
121+
pub bucket_capabilities: Vec<String>,
122122
#[serde(alias = "collectionsManifestUid")]
123-
collections_manifest_uuid: Option<String>,
123+
pub collections_manifest_uuid: Option<String>,
124124
#[serde(alias = "ddocs")]
125-
ddocs: Option<ConfigDDocs>,
125+
pub ddocs: Option<ConfigDDocs>,
126126
#[serde(alias = "vBucketServerMap")]
127-
vbucket_server_map: Option<VBucketServerMap>,
127+
pub vbucket_server_map: Option<VBucketServerMap>,
128128
#[serde(alias = "nodes")]
129-
nodes: Vec<TerseNodeConfig>,
130-
#[serde(alias = "nodes_ext")]
131-
nodes_ext: Vec<TerseNodeExtConfig>,
129+
pub nodes: Vec<TerseNodeConfig>,
130+
#[serde(alias = "nodesExt")]
131+
pub nodes_ext: Vec<TerseNodeExtConfig>,
132132
#[serde(alias = "clusterCapabilitiesVer")]
133-
cluster_capabilities_ver: Vec<i64>,
133+
pub cluster_capabilities_ver: Vec<i64>,
134134
#[serde(alias = "clusterCapabilities")]
135-
cluster_capabilities: HashMap<String, Vec<String>>,
135+
pub cluster_capabilities: HashMap<String, Vec<String>>,
136136
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
use std::collections::HashMap;
2+
3+
use crate::cbconfig::{TerseConfig, TerseExtNodePorts, VBucketServerMap};
4+
use crate::parsedconfig::{
5+
BucketType, ParsedConfig, ParsedConfigBucket, ParsedConfigFeatures, ParsedConfigNode,
6+
ParsedConfigNodeAddresses, ParsedConfigNodePorts,
7+
};
8+
use crate::result::CoreResult;
9+
use crate::vbucketmap::VbucketMap;
10+
11+
pub(crate) struct ConfigParser {}
12+
13+
impl ConfigParser {
14+
pub fn parse_terse_config(
15+
config: TerseConfig,
16+
source_hostname: &str,
17+
) -> CoreResult<ParsedConfig> {
18+
let rev_id = config.rev;
19+
let rev_epoch = config.rev_epoch.unwrap_or_default();
20+
21+
let len_nodes = config.nodes.len();
22+
let mut nodes = Vec::with_capacity(config.nodes_ext.len());
23+
for (node_idx, node) in config.nodes_ext.into_iter().enumerate() {
24+
let node_hostname = Self::parse_config_hostname(&node.hostname, source_hostname);
25+
26+
let mut alt_addresses = HashMap::new();
27+
for (network_type, alt_addrs) in node.alternate_addresses {
28+
let alt_hostname = Self::parse_config_hostname(&alt_addrs.hostname, &node_hostname);
29+
let this_address = Self::parse_config_hosts_into(&alt_hostname, alt_addrs.ports);
30+
31+
alt_addresses.insert(network_type, this_address);
32+
}
33+
34+
let this_node = ParsedConfigNode {
35+
has_data: node_idx < len_nodes,
36+
addresses: Self::parse_config_hosts_into(
37+
&node_hostname,
38+
node.services.unwrap_or_default(),
39+
),
40+
alt_addresses,
41+
};
42+
43+
nodes.push(this_node);
44+
}
45+
46+
let bucket = if let Some(bucket_name) = config.name {
47+
let bucket_uuid = config.uuid.unwrap_or_default();
48+
let (bucket_type, vbucket_map) = match config.node_locator.as_str() {
49+
"vbucket" => (
50+
BucketType::Couchbase,
51+
Self::parse_vbucket_server_map(config.vbucket_server_map)?,
52+
),
53+
_ => (BucketType::Invalid, None),
54+
};
55+
56+
Some(ParsedConfigBucket {
57+
bucket_uuid,
58+
bucket_name,
59+
bucket_type,
60+
vbucket_map,
61+
})
62+
} else {
63+
None
64+
};
65+
66+
let features = if let Some(caps) = config.cluster_capabilities.get("fts") {
67+
ParsedConfigFeatures {
68+
fts_vector_search: caps.contains(&"vectorSearch".to_string()),
69+
}
70+
} else {
71+
ParsedConfigFeatures {
72+
fts_vector_search: false,
73+
}
74+
};
75+
76+
Ok(ParsedConfig {
77+
rev_id,
78+
rev_epoch,
79+
bucket,
80+
nodes,
81+
features,
82+
})
83+
}
84+
85+
fn parse_config_hostname(hostname: &str, source_hostname: &str) -> String {
86+
if hostname.is_empty() {
87+
return source_hostname.to_string();
88+
}
89+
90+
if hostname.contains(':') {
91+
return format!("[{}]", hostname);
92+
}
93+
94+
hostname.to_string()
95+
}
96+
97+
fn parse_config_hosts_into(
98+
hostname: &str,
99+
ports: TerseExtNodePorts,
100+
) -> ParsedConfigNodeAddresses {
101+
ParsedConfigNodeAddresses {
102+
hostname: hostname.to_string(),
103+
non_ssl_ports: ParsedConfigNodePorts {
104+
kv: ports.kv,
105+
mgmt: ports.mgmt,
106+
query: ports.n1ql,
107+
search: ports.fts,
108+
analytics: ports.cbas,
109+
},
110+
ssl_ports: ParsedConfigNodePorts {
111+
kv: ports.kv_ssl,
112+
mgmt: ports.mgmt_ssl,
113+
query: ports.n1ql_ssl,
114+
search: ports.fts_ssl,
115+
analytics: ports.cbas_ssl,
116+
},
117+
}
118+
}
119+
120+
fn parse_vbucket_server_map(
121+
vbucket_server_map: Option<VBucketServerMap>,
122+
) -> CoreResult<Option<VbucketMap>> {
123+
if let Some(vbucket_server_map) = vbucket_server_map {
124+
if vbucket_server_map.vbucket_map.is_empty() {
125+
return Ok(None);
126+
}
127+
128+
return Ok(Some(VbucketMap::new(
129+
vbucket_server_map.vbucket_map,
130+
vbucket_server_map.num_replicas,
131+
)?));
132+
}
133+
134+
Ok(None)
135+
}
136+
}

0 commit comments

Comments
 (0)