Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/cosmos/azure_data_cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Features Added

- Adjusted the query engine abstraction to support future enhancements and optimizations. ([#3166](https://github.com/Azure/azure-sdk-for-rust/pull/3166))

### Breaking Changes

### Bugs Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl CosmosClient {
/// use azure_core::credentials::Secret;
///
/// let client = CosmosClient::with_connection_string(
/// "AccountEndpoint=https://accountname.documents.azure.com:443/‌​;AccountKey=accountk‌​ey",
/// Secret::from("AccountEndpoint=https://accountname.documents.azure.com:443/‌​;AccountKey=accountk‌​ey"),
/// None)
/// .unwrap();
/// ```
Expand Down
39 changes: 38 additions & 1 deletion sdk/cosmos/azure_data_cosmos/src/query/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,42 @@ pub struct QueryRequest {
/// The ID of the partition key range to query.
pub partition_key_range_id: String,

/// The index of this request, within the partition key range.
///
/// This value will always increase for subsequent requests for the same partition key range.
/// It must be provided back to the pipeline when providing data, so that the pipeline can ensure that data is provided in order.
pub index: usize,

/// The continuation to use, if any.
pub continuation: Option<String>,

/// The query to execute for this partition key range, if different from the original query.
pub query: Option<String>,

/// If a query is specified, this flag indicates if the query parameters should be included with that query.
///
/// Sometimes, when an override query is specified, it differs in structure from the original query, and the original parameters are not valid.
pub include_parameters: bool,

/// If specified, indicates that the SDK should IMMEDIATELY drain all remaining results from this partition key range, following continuation tokens, until no more results are available.
/// All the data from this partition key range should be provided BEFORE any new items will be made available.
///
/// This allows engines to optimize for non-streaming scenarios, where the entire result set must be provided to the engine before it can make progress.
pub drain: bool,
}

/// The request of a single-partition query for a specific partition key range.
pub struct QueryResult<'a> {
/// The ID of the partition key range that was queried.
pub partition_key_range_id: &'a str,

/// The index of the [`QueryRequest`] that generated this result.
pub request_index: usize,

/// The continuation token to be used for the next request, if any.
pub next_continuation: Option<String>,

/// The raw body of the response from the query.
pub result: &'a [u8],
}

Expand Down Expand Up @@ -38,7 +66,16 @@ pub trait QueryPipeline: Send {
fn run(&mut self) -> azure_core::Result<PipelineResult>;

/// Provides additional single-partition data to the pipeline.
fn provide_data(&mut self, data: QueryResult) -> azure_core::Result<()>;
///
/// Data from multiple partition ranges may be provided at once.
/// However, each page of data must be provided in order.
/// So, for any given partition key range, page n's results must be earlier in the `data` vector than page n+1's results.
/// Data from different partition key ranges may be interleaved, as long as each partition key range's pages are in order.
///
/// The pipeline will use the [`QueryResult::request_index`] field to validate this.
///
/// When providing data from a draining request (i.e. a request with `drain = true`), all pages for that draining request can share the same [`QueryResult::request_index`].
fn provide_data(&mut self, data: Vec<QueryResult>) -> azure_core::Result<()>;
}

/// Provides an interface to a query engine, which constructs query pipelines.
Expand Down
73 changes: 49 additions & 24 deletions sdk/cosmos/azure_data_cosmos/src/query/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,36 +128,61 @@ impl<T: DeserializeOwned + Send + 'static> QueryExecutor<T> {
}

// No items, so make any requests we need to make and provide them to the pipeline.
// TODO: We can absolutely parallelize these requests.
for request in results.requests {
let mut query_request = base_request.clone();
let mut query_request = if let Some(query) = request.query {
let mut query = Query::from(query);
if request.include_parameters {
query = query.with_parameters_from(&self.query)
}
crate::pipeline::create_base_query_request(
self.http_pipeline.url(&self.items_link),
&query,
)?
} else {
base_request.clone()
};

query_request.insert_header(
constants::PARTITION_KEY_RANGE_ID,
request.partition_key_range_id.clone(),
);
if let Some(continuation) = request.continuation {
query_request.insert_header(constants::CONTINUATION, continuation);
}

let resp = self
.http_pipeline
.send_raw(
self.context.to_borrowed(),
&mut query_request,
self.items_link.clone(),
)
.await?;

let next_continuation =
resp.headers().get_optional_string(&constants::CONTINUATION);
let body = resp.into_body();

let result = QueryResult {
partition_key_range_id: &request.partition_key_range_id,
next_continuation,
result: &body,
};

pipeline.provide_data(result)?;
let mut fetch_more_pages = true;
while fetch_more_pages {
if let Some(c) = request.continuation.clone() {
query_request.insert_header(constants::CONTINUATION, c);
} else {
// Make sure we don't send a continuation header if we don't have one, even if we did on a previous iteration.
query_request.headers_mut().remove(constants::CONTINUATION);
}

let resp = self
.http_pipeline
.send_raw(
self.context.to_borrowed(),
&mut query_request,
self.items_link.clone(),
)
.await?;

let next_continuation =
resp.headers().get_optional_string(&constants::CONTINUATION);

fetch_more_pages = request.drain && next_continuation.is_some();

let body = resp.into_body();
let result = QueryResult {
partition_key_range_id: &request.partition_key_range_id,
request_index: request.index,
next_continuation,
result: &body,
};

// For now, just provide a single result at a time.
// When we parallelize requests, we can more easily provide multiple results at once.
pipeline.provide_data(vec![result])?;
}
}

// No items, but we provided more data (probably), so continue the loop.
Expand Down
42 changes: 42 additions & 0 deletions sdk/cosmos/azure_data_cosmos/src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ impl Query {
Ok(self)
}

/// Replaces all parameters in this [`Query`] instance with the parameters from another [`Query`] instance, and returns it.
///
/// Since the parameters in the other query are already serialized, this method cannot fail.
#[cfg(feature = "preview_query_engine")] // Crate-private for now, and thus only in the preview_query_engine feature (which is the only place it's used).
pub(crate) fn with_parameters_from(mut self, other: &Query) -> Self {
self.parameters = other.parameters.clone();
self
}

/// Consumes this [`Query`] instance, replaces its text with the provided value, and returns it.
pub fn with_text(mut self, text: String) -> Self {
self.text = text;
Expand Down Expand Up @@ -248,4 +257,37 @@ mod tests {
assert_eq!(query.parameters.len(), 2);
Ok(())
}

#[test]
#[cfg(feature = "preview_query_engine")]
pub fn with_parameters_from_replaces_all_parameters() -> Result<(), Box<dyn Error>> {
let source_query = Query::from("SELECT * FROM c")
.with_parameter("@id", 42)?
.with_parameter("@name", "Contoso")?;

let target_query = Query::from("SELECT c.value FROM c WHERE c.id = @id AND c.name = @name")
.with_parameter("@old_param", "old_value")?
.with_parameters_from(&source_query);

// Check that the text is preserved from the target query
assert_eq!(
target_query.text,
"SELECT c.value FROM c WHERE c.id = @id AND c.name = @name"
);

// Check that parameters are replaced with those from source query
assert_eq!(target_query.parameters.len(), 2);
assert_eq!(target_query.parameters[0].name, "@id");
assert_eq!(
target_query.parameters[0].value,
serde_json::Value::Number(serde_json::Number::from(42))
);
assert_eq!(target_query.parameters[1].name, "@name");
assert_eq!(
target_query.parameters[1].value,
serde_json::Value::String("Contoso".to_string())
);

Ok(())
}
}
23 changes: 12 additions & 11 deletions sdk/cosmos/azure_data_cosmos/tests/cosmos_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

use std::error::Error;

use azure_core::{error::ErrorResponse, http::StatusCode};
use azure_core::http::StatusCode;
use azure_core_test::{recorded, TestContext};
use azure_data_cosmos::Query;
use azure_data_cosmos::{constants, Query};
use framework::{test_data, MockItem, TestAccount};
use futures::TryStreamExt;

Expand Down Expand Up @@ -156,15 +156,16 @@ pub async fn cross_partition_query_with_order_by_fails_without_query_engine(
};
assert_eq!(Some(StatusCode::BadRequest), err.http_status());

let error_response = ErrorResponse::try_from(err).expect("expected an HttpResponse error");
let message = error_response
.error
.expect("error should be present")
.message
.expect("message should be present");
assert!(message.starts_with(
"The provided cross partition query can not be directly served by the gateway."
));
let response =
if let azure_core::error::ErrorKind::HttpResponse { raw_response, .. } = err.kind() {
raw_response.as_ref().unwrap().clone()
} else {
panic!("expected an HTTP response error");
};
let sub_status = response.headers().get_optional_str(&constants::SUB_STATUS);

// 1004 = CrossPartitionQueryNotServable
assert_eq!(Some("1004"), sub_status);

account.cleanup().await?;
Ok(())
Expand Down
Loading