diff --git a/sdk/couchbase-core/src/agent.rs b/sdk/couchbase-core/src/agent.rs index 5ae2cf16..ab4a071f 100644 --- a/sdk/couchbase-core/src/agent.rs +++ b/sdk/couchbase-core/src/agent.rs @@ -668,7 +668,12 @@ impl Agent { Err(_e) => continue, }; - let raw_config = match client.get_cluster_config(GetClusterConfigRequest {}).await { + let raw_config = match client + .get_cluster_config(GetClusterConfigRequest { + known_version: None, + }) + .await + { Ok(resp) => resp.config, Err(_e) => continue, }; diff --git a/sdk/couchbase-core/src/configwatcher.rs b/sdk/couchbase-core/src/configwatcher.rs index b6269f91..a787a720 100644 --- a/sdk/couchbase-core/src/configwatcher.rs +++ b/sdk/couchbase-core/src/configwatcher.rs @@ -5,7 +5,8 @@ use crate::error::{Error, Result}; use crate::kvclient::KvClient; use crate::kvclient_ops::KvClientOps; use crate::kvclientmanager::KvClientManager; -use crate::memdx::request::GetClusterConfigRequest; +use crate::memdx::hello_feature::HelloFeature; +use crate::memdx::request::{GetClusterConfigKnownVersion, GetClusterConfigRequest}; use crate::parsedconfig::ParsedConfig; use futures::future::err; use log::{debug, error, trace}; @@ -106,7 +107,7 @@ where recent_endpoints.push(endpoint.clone()); - let parsed_config = match self.poll_one(&endpoint).await { + let parsed_config = match self.poll_one(&endpoint, &last_sent_config).await { Ok(c) => c, Err(e) => { // TODO: log @@ -117,21 +118,23 @@ where all_endpoints_failed = false; - if let Some(config) = &last_sent_config { - if let Some(cmp) = parsed_config.partial_cmp(config) { - if cmp == Ordering::Greater { - // TODO: log. - on_new_config_tx - .send(parsed_config.clone()) - .unwrap_or_default(); - last_sent_config = Some(parsed_config); + if let Some(parsed_config) = parsed_config { + if let Some(config) = &last_sent_config { + if let Some(cmp) = parsed_config.partial_cmp(config) { + if cmp == Ordering::Greater { + // TODO: log. + on_new_config_tx + .send(parsed_config.clone()) + .unwrap_or_default(); + last_sent_config = Some(parsed_config); + } } + } else { + on_new_config_tx + .send(parsed_config.clone()) + .unwrap_or_default(); + last_sent_config = Some(parsed_config); } - } else { - on_new_config_tx - .send(parsed_config.clone()) - .unwrap_or_default(); - last_sent_config = Some(parsed_config); } select! { @@ -143,23 +146,48 @@ where } } - async fn poll_one(&self, endpoint: &str) -> Result { + async fn poll_one( + &self, + endpoint: &str, + latest_config: &Option, + ) -> Result> { debug!("Polling config from {}", &endpoint); let client = self.kv_client_manager.get_client(endpoint).await?; let hostname = client.remote_hostname(); + let known_version = { + if let Some(latest_config) = latest_config { + if latest_config.rev_id > 0 + && client.has_feature(HelloFeature::ClusterMapKnownVersion) + { + Some(GetClusterConfigKnownVersion { + rev_epoch: latest_config.rev_epoch, + rev_id: latest_config.rev_id, + }) + } else { + None + } + } else { + None + } + }; let resp = client - .get_cluster_config(GetClusterConfigRequest {}) + .get_cluster_config(GetClusterConfigRequest { known_version }) .await .map_err(Error::new_contextual_memdx_error)?; + if resp.config.is_empty() { + debug!("Poller received empty config"); + return Ok(None); + } + let config = cbconfig::parse::parse_terse_config(&resp.config, hostname)?; trace!("Poller fetched new config {:?}", &config); - ConfigParser::parse_terse_config(config, hostname) + Ok(Some(ConfigParser::parse_terse_config(config, hostname)?)) } } diff --git a/sdk/couchbase-core/src/kvclient.rs b/sdk/couchbase-core/src/kvclient.rs index 71aa19db..68c19633 100644 --- a/sdk/couchbase-core/src/kvclient.rs +++ b/sdk/couchbase-core/src/kvclient.rs @@ -134,6 +134,7 @@ where HelloFeature::CreateAsDeleted, HelloFeature::AltRequests, HelloFeature::Collections, + HelloFeature::ClusterMapKnownVersion, ] }; diff --git a/sdk/couchbase-core/src/memdx/hello_feature.rs b/sdk/couchbase-core/src/memdx/hello_feature.rs index 90f711de..eda63746 100644 --- a/sdk/couchbase-core/src/memdx/hello_feature.rs +++ b/sdk/couchbase-core/src/memdx/hello_feature.rs @@ -22,6 +22,7 @@ pub enum HelloFeature { PointInTimeRecovery, CreateAsDeleted, ReplaceBodyWithXattr, + ClusterMapKnownVersion, Unknown(u16), } @@ -50,6 +51,7 @@ impl From for u16 { HelloFeature::PointInTimeRecovery => 0x16, HelloFeature::CreateAsDeleted => 0x17, HelloFeature::ReplaceBodyWithXattr => 0x19, + HelloFeature::ClusterMapKnownVersion => 0x1d, HelloFeature::Unknown(code) => code, } } @@ -80,6 +82,7 @@ impl From for HelloFeature { 0x16 => HelloFeature::PointInTimeRecovery, 0x17 => HelloFeature::CreateAsDeleted, 0x19 => HelloFeature::ReplaceBodyWithXattr, + 0x1d => HelloFeature::ClusterMapKnownVersion, code => HelloFeature::Unknown(code), } } diff --git a/sdk/couchbase-core/src/memdx/ops_core.rs b/sdk/couchbase-core/src/memdx/ops_core.rs index aae5b38c..3796471c 100644 --- a/sdk/couchbase-core/src/memdx/ops_core.rs +++ b/sdk/couchbase-core/src/memdx/ops_core.rs @@ -19,6 +19,7 @@ use crate::memdx::response::{ SASLListMechsResponse, SASLStepResponse, SelectBucketResponse, }; use crate::memdx::status::Status; +use byteorder::ByteOrder; pub struct OpsCore {} @@ -158,11 +159,21 @@ impl OpBootstrapEncoder for OpsCore { async fn get_cluster_config( &self, dispatcher: &D, - _request: GetClusterConfigRequest, + request: GetClusterConfigRequest, ) -> Result> where D: Dispatcher, { + let mut extra_buf = [0; 16]; + let extras = if let Some(known_version) = request.known_version { + byteorder::BigEndian::write_u64(&mut extra_buf[0..8], known_version.rev_epoch as u64); + byteorder::BigEndian::write_u64(&mut extra_buf[8..16], known_version.rev_id as u64); + + Some(&extra_buf[..]) + } else { + None + }; + let op = dispatcher .dispatch( RequestPacket { @@ -171,7 +182,7 @@ impl OpBootstrapEncoder for OpsCore { datatype: 0, vbucket_id: None, cas: None, - extras: None, + extras, key: None, value: None, framing_extras: None, diff --git a/sdk/couchbase-core/src/memdx/request.rs b/sdk/couchbase-core/src/memdx/request.rs index 1ee21985..7924162b 100644 --- a/sdk/couchbase-core/src/memdx/request.rs +++ b/sdk/couchbase-core/src/memdx/request.rs @@ -43,11 +43,21 @@ impl SelectBucketRequest { } #[derive(Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] -pub struct GetClusterConfigRequest {} +pub struct GetClusterConfigKnownVersion { + pub rev_epoch: i64, + pub rev_id: i64, +} + +#[derive(Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct GetClusterConfigRequest { + pub known_version: Option, +} impl GetClusterConfigRequest { pub fn new() -> Self { - Self {} + Self { + known_version: None, + } } } diff --git a/sdk/couchbase-core/src/retry.rs b/sdk/couchbase-core/src/retry.rs index 418eb841..59d5269c 100644 --- a/sdk/couchbase-core/src/retry.rs +++ b/sdk/couchbase-core/src/retry.rs @@ -199,8 +199,11 @@ impl Display for RetryInfo { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "{{ operation: {}, id: {}, is_idempotent: {}, retry_attempts: {}, retry_reasons: {} }}", - self.operation, self.unique_id.as_ref().unwrap_or(&"".to_string()), self.is_idempotent, self.retry_attempts, + "{{ operation: {}, id: {}, is_idempotent: {}, retry_attempts: {}, retry_reasons: {} }}", + self.operation, + self.unique_id.as_ref().unwrap_or(&"".to_string()), + self.is_idempotent, + self.retry_attempts, self.retry_reasons .iter() .map(|r| r.to_string())