Skip to content

Commit 66ee23e

Browse files
committed
RSCBC-28: Add support for cluster map known versions
1 parent 6cf1949 commit 66ee23e

File tree

12 files changed

+157
-111
lines changed

12 files changed

+157
-111
lines changed

sdk/couchbase-core/src/agent.rs

Lines changed: 42 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@ use crate::collection_resolver_cached::{
2727
};
2828
use crate::collection_resolver_memd::{CollectionResolverMemd, CollectionResolverMemdOptions};
2929
use crate::compressionmanager::{CompressionManager, StdCompressor};
30-
use crate::configparser::ConfigParser;
31-
use crate::configwatcher::{
32-
ConfigWatcher, ConfigWatcherMemd, ConfigWatcherMemdConfig, ConfigWatcherMemdOptions,
30+
use crate::configmanager::{
31+
ConfigManager, ConfigManagerMemd, ConfigManagerMemdConfig, ConfigManagerMemdOptions,
3332
};
33+
use crate::configparser::ConfigParser;
3434
use crate::crudcomponent::CrudComponent;
3535
use crate::errmapcomponent::ErrMapComponent;
3636
use crate::error::{Error, ErrorKind, Result};
@@ -72,7 +72,6 @@ struct AgentState {
7272
num_pool_connections: usize,
7373
// http_transport:
7474
last_clients: HashMap<String, KvClientConfig>,
75-
latest_config: ParsedConfig,
7675
network_type: String,
7776

7877
client_name: String,
@@ -84,7 +83,7 @@ type AgentCollectionResolver = CollectionResolverCached<CollectionResolverMemd<A
8483
pub(crate) struct AgentInner {
8584
state: Arc<Mutex<AgentState>>,
8685

87-
cfg_watcher: Arc<dyn ConfigWatcher>,
86+
config_manager: Arc<dyn ConfigManager>,
8887
conn_mgr: Arc<AgentClientManager>,
8988
vb_router: Arc<StdVbucketRouter>,
9089
collections: Arc<AgentCollectionResolver>,
@@ -111,11 +110,11 @@ pub(crate) struct AgentInner {
111110
pub struct Agent {
112111
pub(crate) inner: Arc<AgentInner>,
113112

114-
config_watcher_shutdown_tx: Sender<()>,
113+
config_manager_shutdown_tx: Sender<()>,
115114
}
116115

117116
struct AgentComponentConfigs {
118-
pub config_watcher_memd_config: ConfigWatcherMemdConfig,
117+
pub config_manager_memd_config: ConfigManagerMemdConfig,
119118
pub kv_client_manager_client_configs: HashMap<String, KvClientConfig>,
120119
pub vbucket_routing_info: VbucketRoutingInfo,
121120
pub query_config: QueryComponentConfig,
@@ -129,18 +128,13 @@ impl AgentInner {
129128
pub async fn apply_config(&self, config: ParsedConfig) {
130129
let mut state = self.state.lock().await;
131130

132-
if !Self::can_update_config(&config, &state.latest_config) {
133-
return;
134-
}
135-
136131
info!("Applying updated config");
137-
state.latest_config = config;
138132

139-
self.update_state(&mut state).await;
133+
self.update_state(config, &mut state).await;
140134
}
141135

142-
async fn update_state(&self, state: &mut AgentState) {
143-
let agent_component_configs = Self::gen_agent_component_configs(state);
136+
async fn update_state(&self, config: ParsedConfig, state: &mut AgentState) {
137+
let agent_component_configs = Self::gen_agent_component_configs(&config, state);
144138

145139
// In order to avoid race conditions between operations selecting the
146140
// endpoint they need to send the request to, and fetching an actual
@@ -178,8 +172,8 @@ impl AgentInner {
178172
.update_vbucket_info(agent_component_configs.vbucket_routing_info);
179173

180174
if let Err(e) = self
181-
.cfg_watcher
182-
.reconfigure(agent_component_configs.config_watcher_memd_config)
175+
.config_manager
176+
.reconfigure(agent_component_configs.config_manager_memd_config)
183177
{
184178
error!("Failed to reconfigure memd config watcher component; {}", e);
185179
}
@@ -213,27 +207,11 @@ impl AgentInner {
213207
self.mgmt.reconfigure(agent_component_configs.mgmt_config);
214208
}
215209

216-
fn can_update_config(new_config: &ParsedConfig, old_config: &ParsedConfig) -> bool {
217-
if new_config.bucket != old_config.bucket {
218-
debug!("Switching config due to changed bucket type (bucket takeover)");
219-
return true;
220-
} else if let Some(cmp) = new_config.partial_cmp(old_config) {
221-
if cmp == Ordering::Less {
222-
debug!("Skipping config due to new config being an older revision")
223-
} else if cmp == Ordering::Equal {
224-
debug!("Skipping config due to matching revisions")
225-
} else {
226-
return true;
227-
}
228-
}
229-
230-
false
231-
}
232-
233-
fn gen_agent_component_configs(state: &mut AgentState) -> AgentComponentConfigs {
234-
let network_info = state
235-
.latest_config
236-
.addresses_group_for_network_type(&state.network_type);
210+
fn gen_agent_component_configs(
211+
config: &ParsedConfig,
212+
state: &mut AgentState,
213+
) -> AgentComponentConfigs {
214+
let network_info = config.addresses_group_for_network_type(&state.network_type);
237215

238216
let mut kv_data_node_ids = Vec::new();
239217
let mut kv_data_hosts: HashMap<String, String> = HashMap::new();
@@ -311,7 +289,7 @@ impl AgentInner {
311289
clients.insert(node_id, config);
312290
}
313291

314-
let vbucket_routing_info = if let Some(info) = &state.latest_config.bucket {
292+
let vbucket_routing_info = if let Some(info) = &config.bucket {
315293
VbucketRoutingInfo {
316294
// TODO: Clone
317295
vbucket_info: info.vbucket_map.clone(),
@@ -327,7 +305,7 @@ impl AgentInner {
327305
};
328306

329307
AgentComponentConfigs {
330-
config_watcher_memd_config: ConfigWatcherMemdConfig {
308+
config_manager_memd_config: ConfigManagerMemdConfig {
331309
endpoints: kv_data_node_ids,
332310
},
333311
kv_client_manager_client_configs: clients,
@@ -339,8 +317,7 @@ impl AgentInner {
339317
search_config: SearchComponentConfig {
340318
endpoints: search_endpoints,
341319
authenticator: state.authenticator.clone(),
342-
vector_search_enabled: state
343-
.latest_config
320+
vector_search_enabled: config
344321
.features
345322
.contains(&ParsedConfigFeature::FtsVectorSearch),
346323
},
@@ -358,11 +335,8 @@ impl AgentInner {
358335
}
359336
}
360337

361-
// TODO: This really shouldn't be async
362-
pub async fn bucket_features(&self) -> Result<Vec<BucketFeature>> {
363-
let guard = self.state.lock().await;
364-
365-
if let Some(bucket) = &guard.latest_config.bucket {
338+
pub fn bucket_features(&self) -> Result<Vec<BucketFeature>> {
339+
if let Some(bucket) = self.config_manager.bucket() {
366340
let mut features = vec![];
367341

368342
for feature in &bucket.features {
@@ -442,7 +416,6 @@ impl Agent {
442416
authenticator: Arc::new(opts.authenticator),
443417
num_pool_connections: 1,
444418
last_clients: Default::default(),
445-
latest_config: ParsedConfig::default(),
446419
network_type: "".to_string(),
447420
client_name: client_name.clone(),
448421
tls_config: opts.tls_config,
@@ -471,12 +444,11 @@ impl Agent {
471444
)
472445
.await?;
473446

474-
state.latest_config = first_config;
475-
476-
let network_type = NetworkTypeHeuristic::identify(&state.latest_config);
447+
let network_type = NetworkTypeHeuristic::identify(&first_config);
477448
state.network_type = network_type;
478449

479-
let agent_component_configs = AgentInner::gen_agent_component_configs(&mut state);
450+
let agent_component_configs =
451+
AgentInner::gen_agent_component_configs(&first_config, &mut state);
480452

481453
let err_map_component_conn_mgr = err_map_component.clone();
482454

@@ -501,11 +473,12 @@ impl Agent {
501473
.await?,
502474
);
503475

504-
let cfg_watcher = Arc::new(ConfigWatcherMemd::new(
505-
agent_component_configs.config_watcher_memd_config,
506-
ConfigWatcherMemdOptions {
476+
let cfg_watcher = Arc::new(ConfigManagerMemd::new(
477+
agent_component_configs.config_manager_memd_config,
478+
ConfigManagerMemdOptions {
507479
polling_period: opts.config_poller_config.poll_interval,
508480
kv_client_manager: conn_mgr.clone(),
481+
first_config,
509482
},
510483
));
511484
let vb_router = Arc::new(StdVbucketRouter::new(
@@ -575,7 +548,7 @@ impl Agent {
575548

576549
let inner = Arc::new(AgentInner {
577550
state: Arc::new(Mutex::new(state)),
578-
cfg_watcher: cfg_watcher.clone(),
551+
config_manager: cfg_watcher.clone(),
579552
conn_mgr,
580553
vb_router,
581554
crud,
@@ -596,20 +569,20 @@ impl Agent {
596569

597570
let agent = Agent {
598571
inner,
599-
config_watcher_shutdown_tx: shutdown_tx,
572+
config_manager_shutdown_tx: shutdown_tx,
600573
};
601574

602-
agent.start_config_watcher(cfg_watcher, shutdown_rx);
575+
agent.start_config_manager(cfg_watcher, shutdown_rx);
603576

604577
Ok(agent)
605578
}
606579

607-
fn start_config_watcher(
580+
fn start_config_manager(
608581
&self,
609-
config_watcher: Arc<impl ConfigWatcher>,
582+
config_manager: Arc<impl ConfigManager>,
610583
shutdown_rx: Receiver<()>,
611584
) {
612-
let mut watch_rx = config_watcher.watch(shutdown_rx);
585+
let mut watch_rx = config_manager.watch(shutdown_rx);
613586

614587
let inner = self.inner.clone();
615588
tokio::spawn(async move {
@@ -668,7 +641,12 @@ impl Agent {
668641
Err(_e) => continue,
669642
};
670643

671-
let raw_config = match client.get_cluster_config(GetClusterConfigRequest {}).await {
644+
let raw_config = match client
645+
.get_cluster_config(GetClusterConfigRequest {
646+
known_version: None,
647+
})
648+
.await
649+
{
672650
Ok(resp) => resp.config,
673651
Err(_e) => continue,
674652
};
@@ -821,7 +799,7 @@ impl Agent {
821799
}
822800

823801
pub async fn close(&mut self) {
824-
self.config_watcher_shutdown_tx.send(()).unwrap_or_default();
802+
self.config_manager_shutdown_tx.send(()).unwrap_or_default();
825803

826804
self.inner.conn_mgr.close().await.unwrap_or_default();
827805
}
@@ -837,7 +815,7 @@ struct FirstHttpConfig {
837815

838816
// impl Drop for Agent {
839817
// fn drop(&mut self) {
840-
// self.config_watcher_shutdown_tx.send(()).unwrap_or_default();
818+
// self.config_manager_shutdown_tx.send(()).unwrap_or_default();
841819
//
842820
// block_on(async { self.inner.conn_mgr.close().await }).unwrap_or_default();
843821
// }

sdk/couchbase-core/src/agent_ops.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ use crate::searchx::document_analysis::DocumentAnalysis;
4747
use crate::{searchmgmt_options, searchx};
4848

4949
impl Agent {
50-
pub async fn bucket_features(&self) -> Result<Vec<BucketFeature>> {
51-
self.inner.bucket_features().await
50+
pub fn bucket_features(&self) -> Result<Vec<BucketFeature>> {
51+
self.inner.bucket_features()
5252
}
5353

5454
pub async fn upsert(&self, opts: UpsertOptions<'_>) -> Result<UpsertResult> {

0 commit comments

Comments
 (0)