diff --git a/sdk/couchbase-core/src/agent_ops.rs b/sdk/couchbase-core/src/agent_ops.rs index 6f747f63..85c00923 100644 --- a/sdk/couchbase-core/src/agent_ops.rs +++ b/sdk/couchbase-core/src/agent_ops.rs @@ -33,7 +33,8 @@ use crate::mgmtx::user::{Group, RoleAndDescription, UserAndMetadata}; use crate::querycomponent::QueryResultStream; use crate::queryoptions::{ BuildDeferredIndexesOptions, CreateIndexOptions, CreatePrimaryIndexOptions, DropIndexOptions, - DropPrimaryIndexOptions, GetAllIndexesOptions, QueryOptions, WatchIndexesOptions, + DropPrimaryIndexOptions, EnsureIndexOptions, GetAllIndexesOptions, QueryOptions, + WatchIndexesOptions, }; use crate::queryx::index::Index; use crate::searchcomponent::SearchResultStream; @@ -161,6 +162,10 @@ impl Agent { self.inner.query.watch_indexes(opts).await } + pub async fn ensure_index(&self, opts: &EnsureIndexOptions<'_>) -> Result<()> { + self.inner.query.ensure_index(opts).await + } + pub async fn search(&self, opts: SearchOptions) -> Result { self.inner.search.query(opts).await } diff --git a/sdk/couchbase-core/src/httpcomponent.rs b/sdk/couchbase-core/src/httpcomponent.rs index 540dcf1b..0413728c 100644 --- a/sdk/couchbase-core/src/httpcomponent.rs +++ b/sdk/couchbase-core/src/httpcomponent.rs @@ -1,8 +1,3 @@ -use std::collections::HashMap; -use std::future::Future; -use std::hash::Hash; -use std::sync::{Arc, Mutex}; - use crate::authenticator::Authenticator; use crate::error; use crate::error::ErrorKind; @@ -10,7 +5,12 @@ use crate::httpx::client::Client; use crate::retrybesteffort::BackoffCalculator; use crate::service_type::ServiceType; use crate::util::get_host_port_from_uri; +use log::debug; use rand::Rng; +use std::collections::HashMap; +use std::future::Future; +use std::hash::Hash; +use std::sync::{Arc, Mutex}; pub(crate) struct HttpComponent { service_type: ServiceType, @@ -243,7 +243,13 @@ impl HttpComponent { return Ok(()); } - tokio::time::sleep(backoff.backoff(attempt_idx)).await; + let sleep = backoff.backoff(attempt_idx); + debug!( + "Retrying ensure_resource, after {:?}, attempt number: {}", + sleep, attempt_idx + ); + + tokio::time::sleep(sleep).await; attempt_idx += 1; } } diff --git a/sdk/couchbase-core/src/memdx/client.rs b/sdk/couchbase-core/src/memdx/client.rs index 7192fb2c..586828e5 100644 --- a/sdk/couchbase-core/src/memdx/client.rs +++ b/sdk/couchbase-core/src/memdx/client.rs @@ -203,14 +203,6 @@ impl Client { } } - trace!( - "Sending response on {}. Opcode={}. Opaque={}. Status={}", - opts.client_id, - packet.op_code, - packet.opaque, - packet.status, - ); - let resp = ClientResponse::new(packet, context.context.clone()); match sender.send(Ok(resp)).await { Ok(_) => {} diff --git a/sdk/couchbase-core/src/querycomponent.rs b/sdk/couchbase-core/src/querycomponent.rs index 42c22af7..66df2a30 100644 --- a/sdk/couchbase-core/src/querycomponent.rs +++ b/sdk/couchbase-core/src/querycomponent.rs @@ -1,25 +1,30 @@ -use std::collections::HashMap; -use std::future::Future; -use std::sync::{Arc, Mutex}; - use crate::authenticator::Authenticator; use crate::error; use crate::error::ErrorKind; use crate::httpcomponent::{HttpComponent, HttpComponentState}; use crate::httpx::client::Client; +use crate::mgmtx::node_target::NodeTarget; use crate::queryoptions::{ BuildDeferredIndexesOptions, CreateIndexOptions, CreatePrimaryIndexOptions, DropIndexOptions, - DropPrimaryIndexOptions, GetAllIndexesOptions, QueryOptions, WatchIndexesOptions, + DropPrimaryIndexOptions, EnsureIndexOptions, GetAllIndexesOptions, QueryOptions, + WatchIndexesOptions, }; +use crate::queryx::ensure_index_helper::EnsureIndexHelper; use crate::queryx::index::Index; use crate::queryx::preparedquery::{PreparedQuery, PreparedStatementCache}; use crate::queryx::query::Query; +use crate::queryx::query_options::EnsureIndexPollOptions; use crate::queryx::query_respreader::QueryRespReader; use crate::queryx::query_result::{EarlyMetaData, MetaData}; use crate::retry::{orchestrate_retries, RetryInfo, RetryManager, DEFAULT_RETRY_STRATEGY}; +use crate::retrybesteffort::ExponentialBackoffCalculator; use crate::service_type::ServiceType; use bytes::Bytes; use futures::{Stream, StreamExt}; +use std::collections::HashMap; +use std::future::Future; +use std::sync::{Arc, Mutex}; +use std::time::Duration; pub(crate) struct QueryComponent { http_component: HttpComponent, @@ -396,6 +401,37 @@ impl QueryComponent { .await } + pub async fn ensure_index(&self, opts: &EnsureIndexOptions<'_>) -> error::Result<()> { + let mut helper = EnsureIndexHelper::new( + self.http_component.user_agent(), + opts.index_name, + opts.bucket_name, + opts.scope_name, + opts.collection_name, + opts.on_behalf_of_info, + ); + + let backoff = ExponentialBackoffCalculator::new( + Duration::from_millis(100), + Duration::from_millis(1000), + 1.5, + ); + + self.http_component + .ensure_resource(backoff, async |client: Arc, targets: Vec| { + helper + .clone() + .poll(&EnsureIndexPollOptions { + client, + targets, + desired_state: opts.desired_state, + }) + .await + .map_err(error::Error::from) + }) + .await + } + async fn orchestrate_no_res_mgmt_call( &self, retry_info: RetryInfo, diff --git a/sdk/couchbase-core/src/queryoptions.rs b/sdk/couchbase-core/src/queryoptions.rs index 372060cc..0a065ac2 100644 --- a/sdk/couchbase-core/src/queryoptions.rs +++ b/sdk/couchbase-core/src/queryoptions.rs @@ -5,6 +5,7 @@ use std::time::Duration; use crate::httpx::request::OnBehalfOfInfo; use crate::queryx; +pub use crate::queryx::ensure_index_helper::DesiredState; use crate::queryx::query_options::{FullScanVectors, SparseScanVectors}; use crate::retry::RetryStrategy; @@ -923,3 +924,33 @@ impl<'a> From<&WatchIndexesOptions<'a>> for queryx::query_options::WatchIndexesO } } } + +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct EnsureIndexOptions<'a> { + pub index_name: &'a str, + pub bucket_name: &'a str, + pub scope_name: Option<&'a str>, + pub collection_name: Option<&'a str>, + pub on_behalf_of_info: Option<&'a OnBehalfOfInfo>, + pub desired_state: DesiredState, +} + +impl<'a> EnsureIndexOptions<'a> { + pub fn new( + index_name: &'a str, + bucket_name: &'a str, + scope_name: Option<&'a str>, + collection_name: Option<&'a str>, + desired_state: DesiredState, + ) -> Self { + Self { + index_name, + bucket_name, + scope_name, + collection_name, + on_behalf_of_info: None, + desired_state, + } + } +} diff --git a/sdk/couchbase-core/src/queryx/ensure_index_helper.rs b/sdk/couchbase-core/src/queryx/ensure_index_helper.rs new file mode 100644 index 00000000..41760947 --- /dev/null +++ b/sdk/couchbase-core/src/queryx/ensure_index_helper.rs @@ -0,0 +1,113 @@ +use crate::httpx::client::Client; +use crate::httpx::request::OnBehalfOfInfo; +use crate::mgmtx::node_target::NodeTarget; +use crate::queryx::error; +use crate::queryx::query::Query; +use crate::queryx::query_options::{EnsureIndexPollOptions, GetAllIndexesOptions}; +use std::sync::Arc; + +#[derive(Debug, Clone)] +pub struct EnsureIndexHelper<'a> { + pub user_agent: &'a str, + pub on_behalf_of_info: Option<&'a OnBehalfOfInfo>, + + pub index_name: &'a str, + pub bucket_name: &'a str, + pub scope_name: Option<&'a str>, + pub collection_name: Option<&'a str>, + + confirmed_endpoints: Vec<&'a str>, +} + +#[derive(Copy, Debug, Clone, Ord, PartialOrd, Eq, PartialEq)] +pub enum DesiredState { + Created, + Deleted, +} + +impl<'a> EnsureIndexHelper<'a> { + pub fn new( + user_agent: &'a str, + index_name: &'a str, + bucket_name: &'a str, + scope_name: Option<&'a str>, + collection_name: Option<&'a str>, + on_behalf_of_info: Option<&'a OnBehalfOfInfo>, + ) -> Self { + Self { + user_agent, + on_behalf_of_info, + index_name, + bucket_name, + scope_name, + collection_name, + confirmed_endpoints: vec![], + } + } + + async fn poll_one( + &self, + client: Arc, + target: &NodeTarget, + ) -> error::Result { + let resp = Query { + http_client: client, + user_agent: self.user_agent.to_string(), + endpoint: target.endpoint.to_string(), + username: target.username.to_string(), + password: target.password.to_string(), + } + .get_all_indexes(&GetAllIndexesOptions { + bucket_name: self.bucket_name, + scope_name: self.scope_name, + collection_name: self.collection_name, + on_behalf_of: self.on_behalf_of_info, + }) + .await?; + + for index in resp { + // Indexes here should already be scoped to the bucket, scope, and collection. + if index.name == self.index_name { + return Ok(true); + } + } + + Ok(false) + } + + pub async fn poll( + &mut self, + opts: &'a EnsureIndexPollOptions, + ) -> error::Result { + let mut filtered_targets = Vec::with_capacity(opts.targets.len()); + + for target in &opts.targets { + if !self.confirmed_endpoints.contains(&target.endpoint.as_str()) { + filtered_targets.push(target); + } + } + + let mut success_endpoints = Vec::new(); + for target in &opts.targets { + let exists = self.poll_one(opts.client.clone(), target).await?; + + match opts.desired_state { + DesiredState::Created => { + if exists { + success_endpoints.push(target.endpoint.as_str()); + } + } + DesiredState::Deleted => { + if !exists { + success_endpoints.push(target.endpoint.as_str()); + } + } + } + } + + self.confirmed_endpoints + .extend_from_slice(success_endpoints.as_slice()); + + Ok(success_endpoints.len() == filtered_targets.len()) + } +} diff --git a/sdk/couchbase-core/src/queryx/mod.rs b/sdk/couchbase-core/src/queryx/mod.rs index a9b55f4e..ae85c6f2 100644 --- a/sdk/couchbase-core/src/queryx/mod.rs +++ b/sdk/couchbase-core/src/queryx/mod.rs @@ -1,3 +1,4 @@ +pub mod ensure_index_helper; pub mod error; pub mod index; pub mod preparedquery; diff --git a/sdk/couchbase-core/src/queryx/query_options.rs b/sdk/couchbase-core/src/queryx/query_options.rs index 1ff187b7..884d49a6 100644 --- a/sdk/couchbase-core/src/queryx/query_options.rs +++ b/sdk/couchbase-core/src/queryx/query_options.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::sync::Arc; use std::time::Duration; use serde::ser::{SerializeMap, SerializeSeq}; @@ -6,7 +7,10 @@ use serde::{Serialize, Serializer}; use serde_json::Value; use crate::helpers; +use crate::httpx::client::Client; use crate::httpx::request::OnBehalfOfInfo; +use crate::mgmtx::node_target::NodeTarget; +use crate::queryx::ensure_index_helper::DesiredState; #[derive(Debug, Clone, PartialEq, Eq, Serialize)] #[serde(rename_all = "snake_case")] @@ -858,3 +862,10 @@ impl<'a> WatchIndexesOptions<'a> { self } } + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct EnsureIndexPollOptions { + pub desired_state: DesiredState, + pub client: Arc, + pub targets: Vec, +} diff --git a/sdk/couchbase-core/tests/common/test_agent.rs b/sdk/couchbase-core/tests/common/test_agent.rs index 1211ff7e..74c43ec0 100644 --- a/sdk/couchbase-core/tests/common/test_agent.rs +++ b/sdk/couchbase-core/tests/common/test_agent.rs @@ -369,7 +369,10 @@ impl TestAgent { run_with_std_ensure_deadline(self.agent.ensure_bucket(opts)).await } - pub async fn ensure_search_index(&self, opts: &EnsureIndexOptions<'_>) -> Result<()> { + pub async fn ensure_search_index( + &self, + opts: &searchmgmt_options::EnsureIndexOptions<'_>, + ) -> Result<()> { run_with_std_ensure_deadline(self.agent.ensure_search_index(opts)).await } } diff --git a/sdk/couchbase-core/tests/query.rs b/sdk/couchbase-core/tests/query.rs index 24b0ae9e..b7d7c6d4 100644 --- a/sdk/couchbase-core/tests/query.rs +++ b/sdk/couchbase-core/tests/query.rs @@ -3,14 +3,16 @@ extern crate core; use futures::StreamExt; use serde_json::Value; +use crate::common::helpers::generate_string_key; +use crate::common::test_config::run_test; use couchbase_core::queryoptions::{ BuildDeferredIndexesOptions, CreateIndexOptions, CreatePrimaryIndexOptions, DropIndexOptions, - DropPrimaryIndexOptions, GetAllIndexesOptions, QueryOptions, WatchIndexesOptions, + DropPrimaryIndexOptions, EnsureIndexOptions, GetAllIndexesOptions, QueryOptions, + WatchIndexesOptions, }; +use couchbase_core::queryx::ensure_index_helper::DesiredState; use couchbase_core::queryx::query_result::Status; -use crate::common::test_config::run_test; - mod common; #[test] @@ -110,22 +112,50 @@ fn test_query_indexes() { agent.create_primary_index(&opts).await.unwrap(); - let opts = CreateIndexOptions::new(bucket_name.as_str(), "test_index", &["name"]) + agent + .ensure_index(&EnsureIndexOptions::new( + "#primary", + bucket_name.as_str(), + None, + None, + DesiredState::Created, + )) + .await + .unwrap(); + + let index_name = generate_string_key(); + let index_name = index_name.as_str(); + + let opts = CreateIndexOptions::new(bucket_name.as_str(), index_name, &["name"]) .ignore_if_exists(true) .deferred(true); agent.create_index(&opts).await.unwrap(); + agent + .ensure_index(&EnsureIndexOptions::new( + index_name, + bucket_name.as_str(), + None, + None, + DesiredState::Created, + )) + .await + .unwrap(); + let opts = GetAllIndexesOptions::new(bucket_name.as_str()); let indexes = agent.get_all_indexes(&opts).await.unwrap(); - assert_eq!(2, indexes.len()); + + let num_indexes = indexes.len(); + assert!(num_indexes >= 2); let opts = BuildDeferredIndexesOptions::new(bucket_name.as_str()); agent.build_deferred_indexes(&opts).await.unwrap(); - let opts = WatchIndexesOptions::new(bucket_name.as_str(), &["test_index"]); + let index_names = &[index_name]; + let opts = WatchIndexesOptions::new(bucket_name.as_str(), index_names); tokio::time::timeout( std::time::Duration::from_secs(15), @@ -139,14 +169,41 @@ fn test_query_indexes() { agent.drop_primary_index(&opts).await.unwrap(); + agent + .ensure_index(&EnsureIndexOptions::new( + "#primary", + bucket_name.as_str(), + None, + None, + DesiredState::Deleted, + )) + .await + .unwrap(); + let opts = - DropIndexOptions::new(bucket_name.as_str(), "test_index").ignore_if_not_exists(true); + DropIndexOptions::new(bucket_name.as_str(), index_name).ignore_if_not_exists(true); agent.drop_index(&opts).await.unwrap(); + + agent + .ensure_index(&EnsureIndexOptions::new( + index_name, + bucket_name.as_str(), + None, + None, + DesiredState::Deleted, + )) + .await + .unwrap(); + let opts = GetAllIndexesOptions::new(bucket_name.as_str()); let indexes = agent.get_all_indexes(&opts).await.unwrap(); - assert_eq!(0, indexes.len()); + assert_eq!( + num_indexes - 2, + indexes.len(), + "Indexes were not deleted as expected" + ); }); } diff --git a/sdk/couchbase/tests/query.rs b/sdk/couchbase/tests/query.rs index 3059510e..9f2c4081 100644 --- a/sdk/couchbase/tests/query.rs +++ b/sdk/couchbase/tests/query.rs @@ -166,7 +166,7 @@ fn test_query_indexes() { // Allow time for server to sync with the new collection try_until( - tokio::time::Instant::now() + Duration::from_secs(5), + tokio::time::Instant::now() + Duration::from_secs(30), Duration::from_millis(100), "Primary index was not created in time", async || { @@ -183,10 +183,29 @@ fn test_query_indexes() { let opts = CreateQueryIndexOptions::new() .ignore_if_exists(true) .deferred(true); - manager - .create_index("test_index".to_string(), vec!["name".to_string()], opts) - .await - .unwrap(); + + // Allow time for server to sync with the new collection, this request might go + // somewhere different to the one above. + try_until( + tokio::time::Instant::now() + Duration::from_secs(30), + Duration::from_millis(100), + "Primary index was not created in time", + async || { + let res = manager + .create_index( + "test_index".to_string(), + vec!["name".to_string()], + opts.clone(), + ) + .await; + if res.is_ok() { + Ok(Some(())) + } else { + Ok(None) + } + }, + ) + .await; let indexes = manager.get_all_indexes(None).await.unwrap(); diff --git a/sdk/couchbase/tests/search.rs b/sdk/couchbase/tests/search.rs index 3628e3ed..9854f0d1 100644 --- a/sdk/couchbase/tests/search.rs +++ b/sdk/couchbase/tests/search.rs @@ -108,7 +108,7 @@ fn test_search_basic() { SearchRequest::with_search_query(Query::Term(query.clone())), SearchOptions::new() .include_locations(true) - .server_timeout(Duration::from_secs(5)) + .server_timeout(Duration::from_secs(10)) .facets(facets.clone()) .sort(vec![sort.clone()]) .fields(vec!["city".to_string()]),