Skip to content

Commit 5b006dc

Browse files
committed
RSCBC-28: Add support for config known versions
1 parent 8ae1f8d commit 5b006dc

File tree

8 files changed

+87
-26
lines changed

8 files changed

+87
-26
lines changed

sdk/couchbase-core/src/agent.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -668,7 +668,12 @@ impl Agent {
668668
Err(_e) => continue,
669669
};
670670

671-
let raw_config = match client.get_cluster_config(GetClusterConfigRequest {}).await {
671+
let raw_config = match client
672+
.get_cluster_config(GetClusterConfigRequest {
673+
known_version: None,
674+
})
675+
.await
676+
{
672677
Ok(resp) => resp.config,
673678
Err(_e) => continue,
674679
};

sdk/couchbase-core/src/configwatcher.rs

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ use crate::error::{Error, Result};
55
use crate::kvclient::KvClient;
66
use crate::kvclient_ops::KvClientOps;
77
use crate::kvclientmanager::KvClientManager;
8-
use crate::memdx::request::GetClusterConfigRequest;
8+
use crate::memdx::hello_feature::HelloFeature;
9+
use crate::memdx::request::{GetClusterConfigKnownVersion, GetClusterConfigRequest};
910
use crate::parsedconfig::ParsedConfig;
1011
use futures::future::err;
1112
use log::{debug, error, trace};
@@ -106,7 +107,7 @@ where
106107

107108
recent_endpoints.push(endpoint.clone());
108109

109-
let parsed_config = match self.poll_one(&endpoint).await {
110+
let parsed_config = match self.poll_one(&endpoint, &last_sent_config).await {
110111
Ok(c) => c,
111112
Err(e) => {
112113
// TODO: log
@@ -117,21 +118,23 @@ where
117118

118119
all_endpoints_failed = false;
119120

120-
if let Some(config) = &last_sent_config {
121-
if let Some(cmp) = parsed_config.partial_cmp(config) {
122-
if cmp == Ordering::Greater {
123-
// TODO: log.
124-
on_new_config_tx
125-
.send(parsed_config.clone())
126-
.unwrap_or_default();
127-
last_sent_config = Some(parsed_config);
121+
if let Some(parsed_config) = parsed_config {
122+
if let Some(config) = &last_sent_config {
123+
if let Some(cmp) = parsed_config.partial_cmp(config) {
124+
if cmp == Ordering::Greater {
125+
// TODO: log.
126+
on_new_config_tx
127+
.send(parsed_config.clone())
128+
.unwrap_or_default();
129+
last_sent_config = Some(parsed_config);
130+
}
128131
}
132+
} else {
133+
on_new_config_tx
134+
.send(parsed_config.clone())
135+
.unwrap_or_default();
136+
last_sent_config = Some(parsed_config);
129137
}
130-
} else {
131-
on_new_config_tx
132-
.send(parsed_config.clone())
133-
.unwrap_or_default();
134-
last_sent_config = Some(parsed_config);
135138
}
136139

137140
select! {
@@ -143,23 +146,48 @@ where
143146
}
144147
}
145148

146-
async fn poll_one(&self, endpoint: &str) -> Result<ParsedConfig> {
149+
async fn poll_one(
150+
&self,
151+
endpoint: &str,
152+
latest_config: &Option<ParsedConfig>,
153+
) -> Result<Option<ParsedConfig>> {
147154
debug!("Polling config from {}", &endpoint);
148155

149156
let client = self.kv_client_manager.get_client(endpoint).await?;
150157

151158
let hostname = client.remote_hostname();
159+
let known_version = {
160+
if let Some(latest_config) = latest_config {
161+
if latest_config.rev_id > 0
162+
&& client.has_feature(HelloFeature::ClusterMapKnownVersion)
163+
{
164+
Some(GetClusterConfigKnownVersion {
165+
rev_epoch: latest_config.rev_epoch,
166+
rev_id: latest_config.rev_id,
167+
})
168+
} else {
169+
None
170+
}
171+
} else {
172+
None
173+
}
174+
};
152175

153176
let resp = client
154-
.get_cluster_config(GetClusterConfigRequest {})
177+
.get_cluster_config(GetClusterConfigRequest { known_version })
155178
.await
156179
.map_err(Error::new_contextual_memdx_error)?;
157180

181+
if resp.config.is_empty() {
182+
debug!("Poller received empty config");
183+
return Ok(None);
184+
}
185+
158186
let config = cbconfig::parse::parse_terse_config(&resp.config, hostname)?;
159187

160188
trace!("Poller fetched new config {:?}", &config);
161189

162-
ConfigParser::parse_terse_config(config, hostname)
190+
Ok(Some(ConfigParser::parse_terse_config(config, hostname)?))
163191
}
164192
}
165193

sdk/couchbase-core/src/crudcomponent.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -977,7 +977,7 @@ impl<
977977
self.conn_manager.clone(),
978978
&endpoint,
979979
async |client: Arc<KvClientManagerClientType<M>>| {
980-
operation(collection_id, vb_id, client).await
980+
operation(collection_id, 3, client).await
981981
},
982982
)
983983
.await

sdk/couchbase-core/src/kvclient.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ where
134134
HelloFeature::CreateAsDeleted,
135135
HelloFeature::AltRequests,
136136
HelloFeature::Collections,
137+
HelloFeature::ClusterMapKnownVersion,
137138
]
138139
};
139140

sdk/couchbase-core/src/memdx/hello_feature.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub enum HelloFeature {
2222
PointInTimeRecovery,
2323
CreateAsDeleted,
2424
ReplaceBodyWithXattr,
25+
ClusterMapKnownVersion,
2526
Unknown(u16),
2627
}
2728

@@ -50,6 +51,7 @@ impl From<HelloFeature> for u16 {
5051
HelloFeature::PointInTimeRecovery => 0x16,
5152
HelloFeature::CreateAsDeleted => 0x17,
5253
HelloFeature::ReplaceBodyWithXattr => 0x19,
54+
HelloFeature::ClusterMapKnownVersion => 0x1d,
5355
HelloFeature::Unknown(code) => code,
5456
}
5557
}
@@ -80,6 +82,7 @@ impl From<u16> for HelloFeature {
8082
0x16 => HelloFeature::PointInTimeRecovery,
8183
0x17 => HelloFeature::CreateAsDeleted,
8284
0x19 => HelloFeature::ReplaceBodyWithXattr,
85+
0x1d => HelloFeature::ClusterMapKnownVersion,
8386
code => HelloFeature::Unknown(code),
8487
}
8588
}

sdk/couchbase-core/src/memdx/ops_core.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use crate::memdx::response::{
1919
SASLListMechsResponse, SASLStepResponse, SelectBucketResponse,
2020
};
2121
use crate::memdx::status::Status;
22+
use byteorder::ByteOrder;
2223

2324
pub struct OpsCore {}
2425

@@ -158,11 +159,21 @@ impl OpBootstrapEncoder for OpsCore {
158159
async fn get_cluster_config<D>(
159160
&self,
160161
dispatcher: &D,
161-
_request: GetClusterConfigRequest,
162+
request: GetClusterConfigRequest,
162163
) -> Result<StandardPendingOp<GetClusterConfigResponse>>
163164
where
164165
D: Dispatcher,
165166
{
167+
let mut extra_buf = [0; 16];
168+
let extras = if let Some(known_version) = request.known_version {
169+
byteorder::BigEndian::write_u64(&mut extra_buf[0..8], known_version.rev_epoch as u64);
170+
byteorder::BigEndian::write_u64(&mut extra_buf[8..16], known_version.rev_id as u64);
171+
172+
Some(&extra_buf[..])
173+
} else {
174+
None
175+
};
176+
166177
let op = dispatcher
167178
.dispatch(
168179
RequestPacket {
@@ -171,7 +182,7 @@ impl OpBootstrapEncoder for OpsCore {
171182
datatype: 0,
172183
vbucket_id: None,
173184
cas: None,
174-
extras: None,
185+
extras,
175186
key: None,
176187
value: None,
177188
framing_extras: None,

sdk/couchbase-core/src/memdx/request.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,21 @@ impl SelectBucketRequest {
4343
}
4444

4545
#[derive(Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
46-
pub struct GetClusterConfigRequest {}
46+
pub struct GetClusterConfigKnownVersion {
47+
pub rev_epoch: i64,
48+
pub rev_id: i64,
49+
}
50+
51+
#[derive(Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
52+
pub struct GetClusterConfigRequest {
53+
pub known_version: Option<GetClusterConfigKnownVersion>,
54+
}
4755

4856
impl GetClusterConfigRequest {
4957
pub fn new() -> Self {
50-
Self {}
58+
Self {
59+
known_version: None,
60+
}
5161
}
5262
}
5363

sdk/couchbase-core/src/retry.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,11 @@ impl Display for RetryInfo {
199199
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200200
write!(
201201
f,
202-
"{{ operation: {}, id: {}, is_idempotent: {}, retry_attempts: {}, retry_reasons: {} }}",
203-
self.operation, self.unique_id.as_ref().unwrap_or(&"".to_string()), self.is_idempotent, self.retry_attempts,
202+
"{{ operation: {}, id: {}, is_idempotent: {}, retry_attempts: {}, retry_reasons: {} }}",
203+
self.operation,
204+
self.unique_id.as_ref().unwrap_or(&"".to_string()),
205+
self.is_idempotent,
206+
self.retry_attempts,
204207
self.retry_reasons
205208
.iter()
206209
.map(|r| r.to_string())

0 commit comments

Comments
 (0)