Skip to content

RSCBC-28: Add support for config known versions #269

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion sdk/couchbase-core/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,12 @@ impl Agent {
Err(_e) => continue,
};

let raw_config = match client.get_cluster_config(GetClusterConfigRequest {}).await {
let raw_config = match client
.get_cluster_config(GetClusterConfigRequest {
known_version: None,
})
.await
{
Ok(resp) => resp.config,
Err(_e) => continue,
};
Expand Down
64 changes: 46 additions & 18 deletions sdk/couchbase-core/src/configwatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use crate::error::{Error, Result};
use crate::kvclient::KvClient;
use crate::kvclient_ops::KvClientOps;
use crate::kvclientmanager::KvClientManager;
use crate::memdx::request::GetClusterConfigRequest;
use crate::memdx::hello_feature::HelloFeature;
use crate::memdx::request::{GetClusterConfigKnownVersion, GetClusterConfigRequest};
use crate::parsedconfig::ParsedConfig;
use futures::future::err;
use log::{debug, error, trace};
Expand Down Expand Up @@ -106,7 +107,7 @@ where

recent_endpoints.push(endpoint.clone());

let parsed_config = match self.poll_one(&endpoint).await {
let parsed_config = match self.poll_one(&endpoint, &last_sent_config).await {
Ok(c) => c,
Err(e) => {
// TODO: log
Expand All @@ -117,21 +118,23 @@ where

all_endpoints_failed = false;

if let Some(config) = &last_sent_config {
if let Some(cmp) = parsed_config.partial_cmp(config) {
if cmp == Ordering::Greater {
// TODO: log.
on_new_config_tx
.send(parsed_config.clone())
.unwrap_or_default();
last_sent_config = Some(parsed_config);
if let Some(parsed_config) = parsed_config {
if let Some(config) = &last_sent_config {
if let Some(cmp) = parsed_config.partial_cmp(config) {
if cmp == Ordering::Greater {
// TODO: log.
on_new_config_tx
.send(parsed_config.clone())
.unwrap_or_default();
last_sent_config = Some(parsed_config);
}
}
} else {
on_new_config_tx
.send(parsed_config.clone())
.unwrap_or_default();
last_sent_config = Some(parsed_config);
}
} else {
on_new_config_tx
.send(parsed_config.clone())
.unwrap_or_default();
last_sent_config = Some(parsed_config);
}

select! {
Expand All @@ -143,23 +146,48 @@ where
}
}

async fn poll_one(&self, endpoint: &str) -> Result<ParsedConfig> {
async fn poll_one(
&self,
endpoint: &str,
latest_config: &Option<ParsedConfig>,
) -> Result<Option<ParsedConfig>> {
debug!("Polling config from {}", &endpoint);

let client = self.kv_client_manager.get_client(endpoint).await?;

let hostname = client.remote_hostname();
let known_version = {
if let Some(latest_config) = latest_config {
if latest_config.rev_id > 0
&& client.has_feature(HelloFeature::ClusterMapKnownVersion)
{
Some(GetClusterConfigKnownVersion {
rev_epoch: latest_config.rev_epoch,
rev_id: latest_config.rev_id,
})
} else {
None
}
} else {
None
}
};

let resp = client
.get_cluster_config(GetClusterConfigRequest {})
.get_cluster_config(GetClusterConfigRequest { known_version })
.await
.map_err(Error::new_contextual_memdx_error)?;

if resp.config.is_empty() {
debug!("Poller received empty config");
return Ok(None);
}

let config = cbconfig::parse::parse_terse_config(&resp.config, hostname)?;

trace!("Poller fetched new config {:?}", &config);

ConfigParser::parse_terse_config(config, hostname)
Ok(Some(ConfigParser::parse_terse_config(config, hostname)?))
}
}

Expand Down
1 change: 1 addition & 0 deletions sdk/couchbase-core/src/kvclient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ where
HelloFeature::CreateAsDeleted,
HelloFeature::AltRequests,
HelloFeature::Collections,
HelloFeature::ClusterMapKnownVersion,
]
};

Expand Down
3 changes: 3 additions & 0 deletions sdk/couchbase-core/src/memdx/hello_feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub enum HelloFeature {
PointInTimeRecovery,
CreateAsDeleted,
ReplaceBodyWithXattr,
ClusterMapKnownVersion,
Unknown(u16),
}

Expand Down Expand Up @@ -50,6 +51,7 @@ impl From<HelloFeature> for u16 {
HelloFeature::PointInTimeRecovery => 0x16,
HelloFeature::CreateAsDeleted => 0x17,
HelloFeature::ReplaceBodyWithXattr => 0x19,
HelloFeature::ClusterMapKnownVersion => 0x1d,
HelloFeature::Unknown(code) => code,
}
}
Expand Down Expand Up @@ -80,6 +82,7 @@ impl From<u16> for HelloFeature {
0x16 => HelloFeature::PointInTimeRecovery,
0x17 => HelloFeature::CreateAsDeleted,
0x19 => HelloFeature::ReplaceBodyWithXattr,
0x1d => HelloFeature::ClusterMapKnownVersion,
code => HelloFeature::Unknown(code),
}
}
Expand Down
15 changes: 13 additions & 2 deletions sdk/couchbase-core/src/memdx/ops_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::memdx::response::{
SASLListMechsResponse, SASLStepResponse, SelectBucketResponse,
};
use crate::memdx::status::Status;
use byteorder::ByteOrder;

pub struct OpsCore {}

Expand Down Expand Up @@ -158,11 +159,21 @@ impl OpBootstrapEncoder for OpsCore {
async fn get_cluster_config<D>(
&self,
dispatcher: &D,
_request: GetClusterConfigRequest,
request: GetClusterConfigRequest,
) -> Result<StandardPendingOp<GetClusterConfigResponse>>
where
D: Dispatcher,
{
let mut extra_buf = [0; 16];
let extras = if let Some(known_version) = request.known_version {
byteorder::BigEndian::write_u64(&mut extra_buf[0..8], known_version.rev_epoch as u64);
byteorder::BigEndian::write_u64(&mut extra_buf[8..16], known_version.rev_id as u64);

Some(&extra_buf[..])
} else {
None
};

let op = dispatcher
.dispatch(
RequestPacket {
Expand All @@ -171,7 +182,7 @@ impl OpBootstrapEncoder for OpsCore {
datatype: 0,
vbucket_id: None,
cas: None,
extras: None,
extras,
key: None,
value: None,
framing_extras: None,
Expand Down
14 changes: 12 additions & 2 deletions sdk/couchbase-core/src/memdx/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,21 @@ impl SelectBucketRequest {
}

#[derive(Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct GetClusterConfigRequest {}
pub struct GetClusterConfigKnownVersion {
pub rev_epoch: i64,
pub rev_id: i64,
}

#[derive(Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct GetClusterConfigRequest {
pub known_version: Option<GetClusterConfigKnownVersion>,
}

impl GetClusterConfigRequest {
pub fn new() -> Self {
Self {}
Self {
known_version: None,
}
}
}

Expand Down
7 changes: 5 additions & 2 deletions sdk/couchbase-core/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,11 @@ impl Display for RetryInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{{ operation: {}, id: {}, is_idempotent: {}, retry_attempts: {}, retry_reasons: {} }}",
self.operation, self.unique_id.as_ref().unwrap_or(&"".to_string()), self.is_idempotent, self.retry_attempts,
"{{ operation: {}, id: {}, is_idempotent: {}, retry_attempts: {}, retry_reasons: {} }}",
self.operation,
self.unique_id.as_ref().unwrap_or(&"".to_string()),
self.is_idempotent,
self.retry_attempts,
self.retry_reasons
.iter()
.map(|r| r.to_string())
Expand Down