Skip to content

Commit 1ae2ade

Browse files
committed
RSCBC-28: Add support for deduped nmvb cluster maps
1 parent 467bdd3 commit 1ae2ade

File tree

3 files changed

+40
-41
lines changed

3 files changed

+40
-41
lines changed

sdk/couchbase-core/src/kvclient.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ where
135135
HelloFeature::AltRequests,
136136
HelloFeature::Collections,
137137
HelloFeature::ClusterMapKnownVersion,
138+
HelloFeature::DedupeNotMyVbucketClustermap,
138139
]
139140
};
140141

sdk/couchbase-core/src/memdx/hello_feature.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub enum HelloFeature {
2323
CreateAsDeleted,
2424
ReplaceBodyWithXattr,
2525
ClusterMapKnownVersion,
26+
DedupeNotMyVbucketClustermap,
2627
Unknown(u16),
2728
}
2829

@@ -52,6 +53,7 @@ impl From<HelloFeature> for u16 {
5253
HelloFeature::CreateAsDeleted => 0x17,
5354
HelloFeature::ReplaceBodyWithXattr => 0x19,
5455
HelloFeature::ClusterMapKnownVersion => 0x1d,
56+
HelloFeature::DedupeNotMyVbucketClustermap => 0x1e,
5557
HelloFeature::Unknown(code) => code,
5658
}
5759
}
@@ -83,6 +85,7 @@ impl From<u16> for HelloFeature {
8385
0x17 => HelloFeature::CreateAsDeleted,
8486
0x19 => HelloFeature::ReplaceBodyWithXattr,
8587
0x1d => HelloFeature::ClusterMapKnownVersion,
88+
0x1e => HelloFeature::DedupeNotMyVbucketClustermap,
8689
code => HelloFeature::Unknown(code),
8790
}
8891
}

sdk/couchbase-core/src/vbucketrouter.rs

Lines changed: 36 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1-
use std::future::Future;
2-
use std::sync::{Arc, Mutex, MutexGuard};
3-
41
use crate::cbconfig::TerseConfig;
52
use crate::error::ErrorKind;
63
use crate::error::Result;
74
use crate::memdx::error::ServerErrorKind;
85
use crate::memdx::response::TryFromClientResponse;
96
use crate::nmvbhandler::NotMyVbucketConfigHandler;
107
use crate::vbucketmap::VbucketMap;
8+
use log::debug;
9+
use std::future::Future;
10+
use std::sync::{Arc, Mutex, MutexGuard};
1111

1212
pub(crate) trait VbucketRouter: Send + Sync {
1313
fn update_vbucket_info(&self, info: VbucketRoutingInfo);
@@ -102,59 +102,54 @@ where
102102
V: VbucketRouter,
103103
Fut: Future<Output = Result<Resp>> + Send,
104104
{
105-
let (mut endpoint, mut vb_id) = vb.dispatch_by_key(key, vb_server_idx)?;
105+
let (mut endpoint, vb_id) = vb.dispatch_by_key(key, vb_server_idx)?;
106106

107-
loop {
108-
let err = match operation(endpoint.clone(), vb_id).await {
109-
Ok(r) => return Ok(r),
110-
Err(e) => e,
111-
};
107+
let err = match operation(endpoint.clone(), vb_id).await {
108+
Ok(r) => return Ok(r),
109+
Err(e) => e,
110+
};
112111

113-
let config = if let Some(memdx_err) = err.is_memdx_error() {
114-
if memdx_err.is_server_error_kind(ServerErrorKind::NotMyVbucket) {
115-
if let Some(config) = memdx_err.has_server_config() {
116-
config
117-
} else {
118-
return Err(err);
119-
}
112+
let config = if let Some(memdx_err) = err.is_memdx_error() {
113+
if memdx_err.is_server_error_kind(ServerErrorKind::NotMyVbucket) {
114+
if let Some(config) = memdx_err.has_server_config() {
115+
config
120116
} else {
117+
// This will automatically get retried by the retry manager.
118+
debug!("Received empty NMVB response");
121119
return Err(err);
122120
}
123121
} else {
124122
return Err(err);
125-
};
126-
127-
if config.is_empty() {
128-
return Err(err);
129123
}
124+
} else {
125+
return Err(err);
126+
};
130127

131-
let value = match std::str::from_utf8(config.as_slice()) {
132-
Ok(v) => v.to_string(),
133-
Err(_e) => "".to_string(),
134-
};
135-
136-
let config = value.replace("$HOST", endpoint.as_ref());
128+
if config.is_empty() {
129+
// This shouldn't happen.
130+
return Err(err);
131+
}
137132

138-
let config_json: TerseConfig = match serde_json::from_str(&config) {
139-
Ok(c) => c,
140-
Err(_) => {
141-
return Err(err);
142-
}
143-
};
133+
let value = match std::str::from_utf8(config.as_slice()) {
134+
Ok(v) => v.to_string(),
135+
Err(_e) => "".to_string(),
136+
};
144137

145-
nmvb_handler
146-
.clone()
147-
.not_my_vbucket_config(config_json, &endpoint)
148-
.await;
138+
let config = value.replace("$HOST", endpoint.as_ref());
149139

150-
let (new_endpoint, new_vb_id) = vb.dispatch_by_key(key, vb_server_idx)?;
151-
if new_endpoint == endpoint && new_vb_id == vb_id {
140+
let config_json: TerseConfig = match serde_json::from_str(&config) {
141+
Ok(c) => c,
142+
Err(_) => {
152143
return Err(err);
153144
}
145+
};
154146

155-
endpoint = new_endpoint;
156-
vb_id = new_vb_id;
157-
}
147+
nmvb_handler
148+
.clone()
149+
.not_my_vbucket_config(config_json, &endpoint)
150+
.await;
151+
152+
Err(err)
158153
}
159154

160155
#[cfg(test)]

0 commit comments

Comments
 (0)