Skip to content

Commit 8ae1f8d

Browse files
committed
Improve query test reliability
1 parent 6a80dc2 commit 8ae1f8d

File tree

12 files changed

+310
-36
lines changed

12 files changed

+310
-36
lines changed

sdk/couchbase-core/src/agent_ops.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ use crate::mgmtx::user::{Group, RoleAndDescription, UserAndMetadata};
3333
use crate::querycomponent::QueryResultStream;
3434
use crate::queryoptions::{
3535
BuildDeferredIndexesOptions, CreateIndexOptions, CreatePrimaryIndexOptions, DropIndexOptions,
36-
DropPrimaryIndexOptions, GetAllIndexesOptions, QueryOptions, WatchIndexesOptions,
36+
DropPrimaryIndexOptions, EnsureIndexOptions, GetAllIndexesOptions, QueryOptions,
37+
WatchIndexesOptions,
3738
};
3839
use crate::queryx::index::Index;
3940
use crate::searchcomponent::SearchResultStream;
@@ -161,6 +162,10 @@ impl Agent {
161162
self.inner.query.watch_indexes(opts).await
162163
}
163164

165+
pub async fn ensure_index(&self, opts: &EnsureIndexOptions<'_>) -> Result<()> {
166+
self.inner.query.ensure_index(opts).await
167+
}
168+
164169
pub async fn search(&self, opts: SearchOptions) -> Result<SearchResultStream> {
165170
self.inner.search.query(opts).await
166171
}

sdk/couchbase-core/src/httpcomponent.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
1-
use std::collections::HashMap;
2-
use std::future::Future;
3-
use std::hash::Hash;
4-
use std::sync::{Arc, Mutex};
5-
61
use crate::authenticator::Authenticator;
72
use crate::error;
83
use crate::error::ErrorKind;
94
use crate::httpx::client::Client;
105
use crate::retrybesteffort::BackoffCalculator;
116
use crate::service_type::ServiceType;
127
use crate::util::get_host_port_from_uri;
8+
use log::debug;
139
use rand::Rng;
10+
use std::collections::HashMap;
11+
use std::future::Future;
12+
use std::hash::Hash;
13+
use std::sync::{Arc, Mutex};
1414

1515
pub(crate) struct HttpComponent<C: Client> {
1616
service_type: ServiceType,
@@ -234,7 +234,7 @@ impl<C: Client> HttpComponent<C> {
234234
Fut: Future<Output = error::Result<bool>> + Send,
235235
T: NodeTarget,
236236
{
237-
let mut attempt_idx = 0;
237+
let mut attempt_idx = 1;
238238
loop {
239239
let (client, targets) = self.get_all_targets::<T>(&[])?;
240240

@@ -243,7 +243,13 @@ impl<C: Client> HttpComponent<C> {
243243
return Ok(());
244244
}
245245

246-
tokio::time::sleep(backoff.backoff(attempt_idx)).await;
246+
let sleep = backoff.backoff(attempt_idx);
247+
debug!(
248+
"Retrying ensure_resource, after {:?}, attempt number: {}",
249+
sleep, attempt_idx
250+
);
251+
252+
tokio::time::sleep(sleep).await;
247253
attempt_idx += 1;
248254
}
249255
}

sdk/couchbase-core/src/memdx/client.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -203,14 +203,6 @@ impl Client {
203203
}
204204
}
205205

206-
trace!(
207-
"Sending response on {}. Opcode={}. Opaque={}. Status={}",
208-
opts.client_id,
209-
packet.op_code,
210-
packet.opaque,
211-
packet.status,
212-
);
213-
214206
let resp = ClientResponse::new(packet, context.context.clone());
215207
match sender.send(Ok(resp)).await {
216208
Ok(_) => {}

sdk/couchbase-core/src/querycomponent.rs

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,30 @@
1-
use std::collections::HashMap;
2-
use std::future::Future;
3-
use std::sync::{Arc, Mutex};
4-
51
use crate::authenticator::Authenticator;
62
use crate::error;
73
use crate::error::ErrorKind;
84
use crate::httpcomponent::{HttpComponent, HttpComponentState};
95
use crate::httpx::client::Client;
6+
use crate::mgmtx::node_target::NodeTarget;
107
use crate::queryoptions::{
118
BuildDeferredIndexesOptions, CreateIndexOptions, CreatePrimaryIndexOptions, DropIndexOptions,
12-
DropPrimaryIndexOptions, GetAllIndexesOptions, QueryOptions, WatchIndexesOptions,
9+
DropPrimaryIndexOptions, EnsureIndexOptions, GetAllIndexesOptions, QueryOptions,
10+
WatchIndexesOptions,
1311
};
12+
use crate::queryx::ensure_index_helper::EnsureIndexHelper;
1413
use crate::queryx::index::Index;
1514
use crate::queryx::preparedquery::{PreparedQuery, PreparedStatementCache};
1615
use crate::queryx::query::Query;
16+
use crate::queryx::query_options::EnsureIndexPollOptions;
1717
use crate::queryx::query_respreader::QueryRespReader;
1818
use crate::queryx::query_result::{EarlyMetaData, MetaData};
1919
use crate::retry::{orchestrate_retries, RetryInfo, RetryManager, DEFAULT_RETRY_STRATEGY};
20+
use crate::retrybesteffort::ExponentialBackoffCalculator;
2021
use crate::service_type::ServiceType;
2122
use bytes::Bytes;
2223
use futures::{Stream, StreamExt};
24+
use std::collections::HashMap;
25+
use std::future::Future;
26+
use std::sync::{Arc, Mutex};
27+
use std::time::Duration;
2328

2429
pub(crate) struct QueryComponent<C: Client> {
2530
http_component: HttpComponent<C>,
@@ -396,6 +401,37 @@ impl<C: Client> QueryComponent<C> {
396401
.await
397402
}
398403

404+
pub async fn ensure_index(&self, opts: &EnsureIndexOptions<'_>) -> error::Result<()> {
405+
let mut helper = EnsureIndexHelper::new(
406+
self.http_component.user_agent(),
407+
opts.index_name,
408+
opts.bucket_name,
409+
opts.scope_name,
410+
opts.collection_name,
411+
opts.on_behalf_of_info,
412+
);
413+
414+
let backoff = ExponentialBackoffCalculator::new(
415+
Duration::from_millis(100),
416+
Duration::from_millis(1000),
417+
1.5,
418+
);
419+
420+
self.http_component
421+
.ensure_resource(backoff, async |client: Arc<C>, targets: Vec<NodeTarget>| {
422+
helper
423+
.clone()
424+
.poll(&EnsureIndexPollOptions {
425+
client,
426+
targets,
427+
desired_state: opts.desired_state,
428+
})
429+
.await
430+
.map_err(error::Error::from)
431+
})
432+
.await
433+
}
434+
399435
async fn orchestrate_no_res_mgmt_call<Fut>(
400436
&self,
401437
retry_info: RetryInfo,

sdk/couchbase-core/src/queryoptions.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::time::Duration;
55

66
use crate::httpx::request::OnBehalfOfInfo;
77
use crate::queryx;
8+
pub use crate::queryx::ensure_index_helper::DesiredState;
89
use crate::queryx::query_options::{FullScanVectors, SparseScanVectors};
910
use crate::retry::RetryStrategy;
1011

@@ -923,3 +924,33 @@ impl<'a> From<&WatchIndexesOptions<'a>> for queryx::query_options::WatchIndexesO
923924
}
924925
}
925926
}
927+
928+
#[derive(Debug, Clone)]
929+
#[non_exhaustive]
930+
pub struct EnsureIndexOptions<'a> {
931+
pub index_name: &'a str,
932+
pub bucket_name: &'a str,
933+
pub scope_name: Option<&'a str>,
934+
pub collection_name: Option<&'a str>,
935+
pub on_behalf_of_info: Option<&'a OnBehalfOfInfo>,
936+
pub desired_state: DesiredState,
937+
}
938+
939+
impl<'a> EnsureIndexOptions<'a> {
940+
pub fn new(
941+
index_name: &'a str,
942+
bucket_name: &'a str,
943+
scope_name: Option<&'a str>,
944+
collection_name: Option<&'a str>,
945+
desired_state: DesiredState,
946+
) -> Self {
947+
Self {
948+
index_name,
949+
bucket_name,
950+
scope_name,
951+
collection_name,
952+
on_behalf_of_info: None,
953+
desired_state,
954+
}
955+
}
956+
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
use crate::httpx::client::Client;
2+
use crate::httpx::request::OnBehalfOfInfo;
3+
use crate::mgmtx::node_target::NodeTarget;
4+
use crate::queryx::error;
5+
use crate::queryx::query::Query;
6+
use crate::queryx::query_options::{EnsureIndexPollOptions, GetAllIndexesOptions};
7+
use std::sync::Arc;
8+
9+
#[derive(Debug, Clone)]
10+
pub struct EnsureIndexHelper<'a> {
11+
pub user_agent: &'a str,
12+
pub on_behalf_of_info: Option<&'a OnBehalfOfInfo>,
13+
14+
pub index_name: &'a str,
15+
pub bucket_name: &'a str,
16+
pub scope_name: Option<&'a str>,
17+
pub collection_name: Option<&'a str>,
18+
19+
confirmed_endpoints: Vec<&'a str>,
20+
}
21+
22+
#[derive(Copy, Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
23+
pub enum DesiredState {
24+
Created,
25+
Deleted,
26+
}
27+
28+
impl<'a> EnsureIndexHelper<'a> {
29+
pub fn new(
30+
user_agent: &'a str,
31+
index_name: &'a str,
32+
bucket_name: &'a str,
33+
scope_name: Option<&'a str>,
34+
collection_name: Option<&'a str>,
35+
on_behalf_of_info: Option<&'a OnBehalfOfInfo>,
36+
) -> Self {
37+
Self {
38+
user_agent,
39+
on_behalf_of_info,
40+
index_name,
41+
bucket_name,
42+
scope_name,
43+
collection_name,
44+
confirmed_endpoints: vec![],
45+
}
46+
}
47+
48+
async fn poll_one<C: Client>(
49+
&self,
50+
client: Arc<C>,
51+
target: &NodeTarget,
52+
) -> error::Result<bool> {
53+
let resp = Query {
54+
http_client: client,
55+
user_agent: self.user_agent.to_string(),
56+
endpoint: target.endpoint.to_string(),
57+
username: target.username.to_string(),
58+
password: target.password.to_string(),
59+
}
60+
.get_all_indexes(&GetAllIndexesOptions {
61+
bucket_name: self.bucket_name,
62+
scope_name: self.scope_name,
63+
collection_name: self.collection_name,
64+
on_behalf_of: self.on_behalf_of_info,
65+
})
66+
.await?;
67+
68+
for index in resp {
69+
// Indexes here should already be scoped to the bucket, scope, and collection.
70+
if index.name == self.index_name {
71+
return Ok(true);
72+
}
73+
}
74+
75+
Ok(false)
76+
}
77+
78+
pub async fn poll<C: Client>(
79+
&mut self,
80+
opts: &'a EnsureIndexPollOptions<C>,
81+
) -> error::Result<bool> {
82+
let mut filtered_targets = Vec::with_capacity(opts.targets.len());
83+
84+
for target in &opts.targets {
85+
if !self.confirmed_endpoints.contains(&target.endpoint.as_str()) {
86+
filtered_targets.push(target);
87+
}
88+
}
89+
90+
let mut success_endpoints = Vec::new();
91+
for target in &opts.targets {
92+
let exists = self.poll_one(opts.client.clone(), target).await?;
93+
94+
match opts.desired_state {
95+
DesiredState::Created => {
96+
if exists {
97+
success_endpoints.push(target.endpoint.as_str());
98+
}
99+
}
100+
DesiredState::Deleted => {
101+
if !exists {
102+
success_endpoints.push(target.endpoint.as_str());
103+
}
104+
}
105+
}
106+
}
107+
108+
self.confirmed_endpoints
109+
.extend_from_slice(success_endpoints.as_slice());
110+
111+
Ok(success_endpoints.len() == filtered_targets.len())
112+
}
113+
}

sdk/couchbase-core/src/queryx/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
pub mod ensure_index_helper;
12
pub mod error;
23
pub mod index;
34
pub mod preparedquery;

sdk/couchbase-core/src/queryx/query_options.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
use std::collections::HashMap;
2+
use std::sync::Arc;
23
use std::time::Duration;
34

45
use serde::ser::{SerializeMap, SerializeSeq};
56
use serde::{Serialize, Serializer};
67
use serde_json::Value;
78

89
use crate::helpers;
10+
use crate::httpx::client::Client;
911
use crate::httpx::request::OnBehalfOfInfo;
12+
use crate::mgmtx::node_target::NodeTarget;
13+
use crate::queryx::ensure_index_helper::DesiredState;
1014

1115
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1216
#[serde(rename_all = "snake_case")]
@@ -858,3 +862,10 @@ impl<'a> WatchIndexesOptions<'a> {
858862
self
859863
}
860864
}
865+
866+
#[derive(Debug, Clone, Eq, PartialEq)]
867+
pub struct EnsureIndexPollOptions<C: Client> {
868+
pub desired_state: DesiredState,
869+
pub client: Arc<C>,
870+
pub targets: Vec<NodeTarget>,
871+
}

sdk/couchbase-core/tests/common/test_agent.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,10 @@ impl TestAgent {
369369
run_with_std_ensure_deadline(self.agent.ensure_bucket(opts)).await
370370
}
371371

372-
pub async fn ensure_search_index(&self, opts: &EnsureIndexOptions<'_>) -> Result<()> {
372+
pub async fn ensure_search_index(
373+
&self,
374+
opts: &searchmgmt_options::EnsureIndexOptions<'_>,
375+
) -> Result<()> {
373376
run_with_std_ensure_deadline(self.agent.ensure_search_index(opts)).await
374377
}
375378
}

0 commit comments

Comments
 (0)