From e0636120ddfc85d3ed1768ec1db8f755e7be7900 Mon Sep 17 00:00:00 2001 From: Ashley Stanton-Nurse Date: Thu, 9 Oct 2025 17:53:17 +0000 Subject: [PATCH 1/7] allow query engine to provide a custom query for each request --- .../azure_data_cosmos/src/query/engine.rs | 8 + .../azure_data_cosmos/src/query/executor.rs | 14 +- sdk/cosmos/azure_data_cosmos/src/query/mod.rs | 42 ++++- .../tests/framework/query_engine.rs | 41 ++++- .../azure_data_cosmos/tests/query_engine.rs | 172 ++++++++++++++++++ 5 files changed, 273 insertions(+), 4 deletions(-) diff --git a/sdk/cosmos/azure_data_cosmos/src/query/engine.rs b/sdk/cosmos/azure_data_cosmos/src/query/engine.rs index 8cfe60e7f1..22a8c5c5cf 100644 --- a/sdk/cosmos/azure_data_cosmos/src/query/engine.rs +++ b/sdk/cosmos/azure_data_cosmos/src/query/engine.rs @@ -5,6 +5,14 @@ pub struct QueryRequest { /// The continuation to use, if any. pub continuation: Option, + + /// The query to execute for this partition key range, if different from the original query. + pub query: Option, + + /// If a query is specified, this flag indicates if the query parameters should be included with that query. + /// + /// Sometimes, when an override query is specified, it differs in structure from the original query, and the original parameters are not valid. + pub include_parameters: bool, } /// The request of a single-partition query for a specific partition key range. diff --git a/sdk/cosmos/azure_data_cosmos/src/query/executor.rs b/sdk/cosmos/azure_data_cosmos/src/query/executor.rs index f0e93b326d..e2ed9908db 100644 --- a/sdk/cosmos/azure_data_cosmos/src/query/executor.rs +++ b/sdk/cosmos/azure_data_cosmos/src/query/executor.rs @@ -129,7 +129,19 @@ impl QueryExecutor { // No items, so make any requests we need to make and provide them to the pipeline. for request in results.requests { - let mut query_request = base_request.clone(); + let mut query_request = if let Some(query) = request.query { + let mut query = Query::from(query); + if request.include_parameters { + query = query.with_parameters_from(&self.query) + } + crate::pipeline::create_base_query_request( + self.http_pipeline.url(&self.items_link), + &query, + )? + } else { + base_request.clone() + }; + query_request.insert_header( constants::PARTITION_KEY_RANGE_ID, request.partition_key_range_id.clone(), diff --git a/sdk/cosmos/azure_data_cosmos/src/query/mod.rs b/sdk/cosmos/azure_data_cosmos/src/query/mod.rs index 4fd1b7ce40..92dbbcd4ec 100644 --- a/sdk/cosmos/azure_data_cosmos/src/query/mod.rs +++ b/sdk/cosmos/azure_data_cosmos/src/query/mod.rs @@ -108,6 +108,14 @@ impl Query { Ok(self) } + /// Replaces all parameters in this [`Query`] instance with the parameters from another [`Query`] instance, and returns it. + /// + /// Since the parameters in the other query are already serialized, this method cannot fail. + pub fn with_parameters_from(mut self, other: &Query) -> Self { + self.parameters = other.parameters.clone(); + self + } + /// Consumes this [`Query`] instance, replaces its text with the provided value, and returns it. pub fn with_text(mut self, text: String) -> Self { self.text = text; @@ -133,7 +141,7 @@ impl> From for Query { /// Represents a single parameter in a Cosmos DB query. #[derive(Clone, Debug, Serialize)] -struct QueryParameter { +pub(crate) struct QueryParameter { name: String, value: serde_json::Value, } @@ -248,4 +256,36 @@ mod tests { assert_eq!(query.parameters.len(), 2); Ok(()) } + + #[test] + pub fn with_parameters_from_replaces_all_parameters() -> Result<(), Box> { + let source_query = Query::from("SELECT * FROM c") + .with_parameter("@id", 42)? + .with_parameter("@name", "Contoso")?; + + let target_query = Query::from("SELECT c.value FROM c WHERE c.id = @id AND c.name = @name") + .with_parameter("@old_param", "old_value")? + .with_parameters_from(&source_query); + + // Check that the text is preserved from the target query + assert_eq!( + target_query.text, + "SELECT c.value FROM c WHERE c.id = @id AND c.name = @name" + ); + + // Check that parameters are replaced with those from source query + assert_eq!(target_query.parameters.len(), 2); + assert_eq!(target_query.parameters[0].name, "@id"); + assert_eq!( + target_query.parameters[0].value, + serde_json::Value::Number(serde_json::Number::from(42)) + ); + assert_eq!(target_query.parameters[1].name, "@name"); + assert_eq!( + target_query.parameters[1].value, + serde_json::Value::String("Contoso".to_string()) + ); + + Ok(()) + } } diff --git a/sdk/cosmos/azure_data_cosmos/tests/framework/query_engine.rs b/sdk/cosmos/azure_data_cosmos/tests/framework/query_engine.rs index 6be5ccaeca..37771eee3f 100644 --- a/sdk/cosmos/azure_data_cosmos/tests/framework/query_engine.rs +++ b/sdk/cosmos/azure_data_cosmos/tests/framework/query_engine.rs @@ -42,6 +42,17 @@ pub struct MockItem { pub struct MockQueryEngine { /// An error to return when creating a pipeline. pub create_error: Mutex>, + /// Configuration for what kind of QueryRequest the pipeline should return. + pub query_request_config: Mutex>, +} + +/// Configuration for controlling what QueryRequest objects the MockQueryPipeline returns. +#[derive(Clone)] +pub struct QueryRequestConfig { + /// The query to return in QueryRequest.query (None means no override) + pub query: Option, + /// The value of include_parameters in the QueryRequest + pub include_parameters: bool, } impl MockQueryEngine { @@ -49,6 +60,7 @@ impl MockQueryEngine { pub fn new() -> Self { Self { create_error: Mutex::new(None), + query_request_config: Mutex::new(None), } } @@ -56,6 +68,15 @@ impl MockQueryEngine { pub fn with_error(error: azure_core::Error) -> Self { Self { create_error: Mutex::new(Some(error)), + query_request_config: Mutex::new(None), + } + } + + /// Creates a MockQueryEngine with a specific QueryRequest configuration. + pub fn with_query_request_config(config: QueryRequestConfig) -> Self { + Self { + create_error: Mutex::new(None), + query_request_config: Mutex::new(Some(config)), } } } @@ -77,7 +98,8 @@ impl QueryEngine for MockQueryEngine { let pkranges: PkRanges = serde_json::from_slice(pkranges)?; // Create a mock pipeline with the partition key ranges. - let pipeline = MockQueryPipeline::new(query.to_string(), pkranges.ranges); + let config = self.query_request_config.lock().unwrap().clone(); + let pipeline = MockQueryPipeline::new(query.to_string(), pkranges.ranges, config); Ok(Box::new(pipeline)) } @@ -120,10 +142,15 @@ struct MockQueryPipeline { query: String, partitions: Vec, completed: bool, + query_request_config: Option, } impl MockQueryPipeline { - pub fn new(query: String, pkranges: Vec) -> Self { + pub fn new( + query: String, + pkranges: Vec, + config: Option, + ) -> Self { let partitions = pkranges .into_iter() .map(|range| PartitionState { @@ -138,6 +165,7 @@ impl MockQueryPipeline { query, partitions, completed: false, + query_request_config: config, } } @@ -152,6 +180,15 @@ impl MockQueryPipeline { } else { None }, + query: self + .query_request_config + .as_ref() + .and_then(|config| config.query.clone()), + include_parameters: self + .query_request_config + .as_ref() + .map(|config| config.include_parameters) + .unwrap_or(false), }) .collect() } diff --git a/sdk/cosmos/azure_data_cosmos/tests/query_engine.rs b/sdk/cosmos/azure_data_cosmos/tests/query_engine.rs index ab7070da55..2427fd8081 100644 --- a/sdk/cosmos/azure_data_cosmos/tests/query_engine.rs +++ b/sdk/cosmos/azure_data_cosmos/tests/query_engine.rs @@ -79,3 +79,175 @@ pub async fn query_via_query_engine( account.cleanup().await?; Ok(()) } + +#[recorded::test] +pub async fn query_override_without_parameters( + context: TestContext, +) -> Result<(), Box> { + use framework::query_engine::{MockQueryEngine, QueryRequestConfig}; + + let account = TestAccount::from_env(context, None).await?; + let cosmos_client = account.connect_with_key(None)?; + let db_client = test_data::create_database(&account, &cosmos_client).await?; + let items = test_data::generate_mock_items(5, 5); + let container_client = test_data::create_container_with_items( + db_client, + items.clone(), + Some(ThroughputProperties::manual(40000)), // Force multiple physical partitions + ) + .await?; + + let query_config = QueryRequestConfig { + query: Some("SELECT * FROM c WHERE c.id = 'override'".to_string()), + include_parameters: false, + }; + let query_engine = Arc::new(MockQueryEngine::with_query_request_config(query_config)); + + let original_query = azure_data_cosmos::Query::from("SELECT * FROM c WHERE c.id = @param1") + .with_parameter("@param1", "should_not_be_used")?; + + let result_items: Vec = container_client + .query_items( + original_query, + (), + Some(QueryOptions { + query_engine: Some(query_engine), + ..Default::default() + }), + )? + .try_collect() + .await?; + + // Since the override query looks for id = 'override' and our test data doesn't have items + // with that id, we should get no results. This proves the override query was used without parameters. + assert_eq!(0, result_items.len()); + + account.cleanup().await?; + Ok(()) +} + +#[recorded::test] +pub async fn query_override_with_parameters( + context: TestContext, +) -> Result<(), Box> { + use framework::query_engine::{MockQueryEngine, QueryRequestConfig}; + + let account = TestAccount::from_env(context, None).await?; + let cosmos_client = account.connect_with_key(None)?; + let db_client = test_data::create_database(&account, &cosmos_client).await?; + let items = test_data::generate_mock_items(5, 5); + let container_client = test_data::create_container_with_items( + db_client, + items.clone(), + Some(ThroughputProperties::manual(40000)), // Force multiple physical partitions + ) + .await?; + + let query_config = QueryRequestConfig { + query: Some("SELECT * FROM c WHERE c.mergeOrder = @targetOrder".to_string()), + include_parameters: true, + }; + let query_engine = Arc::new(MockQueryEngine::with_query_request_config(query_config)); + + let target_merge_order = items[0].merge_order; + + let original_query = + azure_data_cosmos::Query::from("SELECT * FROM c WHERE c.id = @targetOrder") + .with_parameter("@targetOrder", target_merge_order)?; + + let result_items: Vec = container_client + .query_items( + original_query, + (), + Some(QueryOptions { + query_engine: Some(query_engine), + ..Default::default() + }), + )? + .try_collect() + .await?; + + // Since the override query uses "c.mergeOrder = @targetOrder" and we passed a valid merge order, + // we should get exactly the items that match that merge order. This proves the override query + // was used with the original parameters. + let expected_items: Vec = items + .into_iter() + .filter(|item| item.merge_order == target_merge_order) + .collect(); + + assert_eq!(expected_items.len(), result_items.len()); + assert!( + !expected_items.is_empty(), + "Should have found at least one matching item" + ); + + for expected_item in expected_items { + assert!(result_items.contains(&expected_item)); + } + + account.cleanup().await?; + Ok(()) +} + +#[recorded::test] +pub async fn no_query_override_uses_original( + context: TestContext, +) -> Result<(), Box> { + use framework::query_engine::{MockQueryEngine, QueryRequestConfig}; + + let account = TestAccount::from_env(context, None).await?; + let cosmos_client = account.connect_with_key(None)?; + let db_client = test_data::create_database(&account, &cosmos_client).await?; + let items = test_data::generate_mock_items(5, 5); + let container_client = test_data::create_container_with_items( + db_client, + items.clone(), + Some(ThroughputProperties::manual(40000)), // Force multiple physical partitions + ) + .await?; + + let query_config = QueryRequestConfig { + query: None, + include_parameters: false, + }; + let query_engine = Arc::new(MockQueryEngine::with_query_request_config(query_config)); + + let target_merge_order = items[0].merge_order; + + let original_query = + azure_data_cosmos::Query::from("SELECT * FROM c WHERE c.mergeOrder = @targetOrder") + .with_parameter("@targetOrder", target_merge_order)?; + + let result_items: Vec = container_client + .query_items( + original_query, + (), + Some(QueryOptions { + query_engine: Some(query_engine), + ..Default::default() + }), + )? + .try_collect() + .await?; + + // Since there's no query override, the original query "c.mergeOrder = @targetOrder" should be used + // with the original parameters, so we should get items matching the target merge order. + // This proves the original query and parameters were both preserved. + let expected_items: Vec = items + .into_iter() + .filter(|item| item.merge_order == target_merge_order) + .collect(); + + assert_eq!(expected_items.len(), result_items.len()); + assert!( + !expected_items.is_empty(), + "Should have found at least one matching item" + ); + + for expected_item in expected_items { + assert!(result_items.contains(&expected_item)); + } + + account.cleanup().await?; + Ok(()) +} From 7aa610a33520219f75cc471bcec1eab6ab70e6c2 Mon Sep 17 00:00:00 2001 From: Ashley Stanton-Nurse Date: Thu, 9 Oct 2025 17:53:23 +0000 Subject: [PATCH 2/7] some random fixups? --- .../src/clients/cosmos_client.rs | 2 +- .../azure_data_cosmos/tests/cosmos_query.rs | 23 ++++++++++--------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/sdk/cosmos/azure_data_cosmos/src/clients/cosmos_client.rs b/sdk/cosmos/azure_data_cosmos/src/clients/cosmos_client.rs index 209fadc284..b7f8255cb8 100644 --- a/sdk/cosmos/azure_data_cosmos/src/clients/cosmos_client.rs +++ b/sdk/cosmos/azure_data_cosmos/src/clients/cosmos_client.rs @@ -110,7 +110,7 @@ impl CosmosClient { /// use azure_core::credentials::Secret; /// /// let client = CosmosClient::with_connection_string( - /// "AccountEndpoint=https://accountname.documents.azure.com:443/‌​;AccountKey=accountk‌​ey", + /// Secret::from("AccountEndpoint=https://accountname.documents.azure.com:443/‌​;AccountKey=accountk‌​ey"), /// None) /// .unwrap(); /// ``` diff --git a/sdk/cosmos/azure_data_cosmos/tests/cosmos_query.rs b/sdk/cosmos/azure_data_cosmos/tests/cosmos_query.rs index 6180ad581b..a8d79ddc91 100644 --- a/sdk/cosmos/azure_data_cosmos/tests/cosmos_query.rs +++ b/sdk/cosmos/azure_data_cosmos/tests/cosmos_query.rs @@ -2,9 +2,9 @@ use std::error::Error; -use azure_core::{error::ErrorResponse, http::StatusCode}; +use azure_core::http::StatusCode; use azure_core_test::{recorded, TestContext}; -use azure_data_cosmos::Query; +use azure_data_cosmos::{constants, Query}; use framework::{test_data, MockItem, TestAccount}; use futures::TryStreamExt; @@ -156,15 +156,16 @@ pub async fn cross_partition_query_with_order_by_fails_without_query_engine( }; assert_eq!(Some(StatusCode::BadRequest), err.http_status()); - let error_response = ErrorResponse::try_from(err).expect("expected an HttpResponse error"); - let message = error_response - .error - .expect("error should be present") - .message - .expect("message should be present"); - assert!(message.starts_with( - "The provided cross partition query can not be directly served by the gateway." - )); + let response = + if let azure_core::error::ErrorKind::HttpResponse { raw_response, .. } = err.kind() { + raw_response.as_ref().unwrap().clone() + } else { + panic!("expected an HTTP response error"); + }; + let sub_status = response.headers().get_optional_str(&constants::SUB_STATUS); + + // 1004 = CrossPartitionQueryNotServable + assert_eq!(Some("1004"), sub_status); account.cleanup().await?; Ok(()) From 779ae0c73ed55605c5466a11435e3e9b22cda4f4 Mon Sep 17 00:00:00 2001 From: Ashley Stanton-Nurse Date: Thu, 9 Oct 2025 17:57:01 +0000 Subject: [PATCH 3/7] make some functions pub(crate) and other things private --- sdk/cosmos/azure_data_cosmos/src/query/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/cosmos/azure_data_cosmos/src/query/mod.rs b/sdk/cosmos/azure_data_cosmos/src/query/mod.rs index 92dbbcd4ec..f6ccfb0e27 100644 --- a/sdk/cosmos/azure_data_cosmos/src/query/mod.rs +++ b/sdk/cosmos/azure_data_cosmos/src/query/mod.rs @@ -111,7 +111,7 @@ impl Query { /// Replaces all parameters in this [`Query`] instance with the parameters from another [`Query`] instance, and returns it. /// /// Since the parameters in the other query are already serialized, this method cannot fail. - pub fn with_parameters_from(mut self, other: &Query) -> Self { + pub(crate) fn with_parameters_from(mut self, other: &Query) -> Self { self.parameters = other.parameters.clone(); self } @@ -141,7 +141,7 @@ impl> From for Query { /// Represents a single parameter in a Cosmos DB query. #[derive(Clone, Debug, Serialize)] -pub(crate) struct QueryParameter { +struct QueryParameter { name: String, value: serde_json::Value, } From 33ee3689eaaeaae131899a8e485d189311699e33 Mon Sep 17 00:00:00 2001 From: Ashley Stanton-Nurse Date: Thu, 9 Oct 2025 20:47:55 +0000 Subject: [PATCH 4/7] drain and batching --- .../azure_data_cosmos/src/query/engine.rs | 31 ++++++- .../azure_data_cosmos/src/query/executor.rs | 59 ++++++++------ .../tests/framework/query_engine.rs | 81 +++++++++++-------- 3 files changed, 114 insertions(+), 57 deletions(-) diff --git a/sdk/cosmos/azure_data_cosmos/src/query/engine.rs b/sdk/cosmos/azure_data_cosmos/src/query/engine.rs index 22a8c5c5cf..290d42496f 100644 --- a/sdk/cosmos/azure_data_cosmos/src/query/engine.rs +++ b/sdk/cosmos/azure_data_cosmos/src/query/engine.rs @@ -3,6 +3,12 @@ pub struct QueryRequest { /// The ID of the partition key range to query. pub partition_key_range_id: String, + /// The index of this request, within the partition key range. + /// + /// This value will always increase for subsequent requests for the same partition key range. + /// It must be provided back to the pipeline when providing data, so that the pipeline can ensure that data is provided in order. + pub index: usize, + /// The continuation to use, if any. pub continuation: Option, @@ -13,12 +19,26 @@ pub struct QueryRequest { /// /// Sometimes, when an override query is specified, it differs in structure from the original query, and the original parameters are not valid. pub include_parameters: bool, + + /// If specified, indicates that the SDK should IMMEDIATELY drain all remaining results from this partition key range, following continuation tokens, until no more results are available. + /// All the data from this partition key range should be provided BEFORE any new items will be made available. + /// + /// This allows engines to optimize for non-streaming scenarios, where the entire result set must be provided to the engine before it can make progress. + pub drain: bool, } /// The request of a single-partition query for a specific partition key range. pub struct QueryResult<'a> { + /// The ID of the partition key range that was queried. pub partition_key_range_id: &'a str, + + /// The index of the [`QueryRequest`] that generated this result. + pub request_index: usize, + + /// The continuation token to be used for the next request, if any. pub next_continuation: Option, + + /// The raw body of the response from the query. pub result: &'a [u8], } @@ -46,7 +66,16 @@ pub trait QueryPipeline: Send { fn run(&mut self) -> azure_core::Result; /// Provides additional single-partition data to the pipeline. - fn provide_data(&mut self, data: QueryResult) -> azure_core::Result<()>; + /// + /// Data from multiple partition ranges may be provided at once. + /// However, each page of data must be provided in order. + /// So, for any given partition key range, page n's results must be earlier in the `data` vector than page n+1's results. + /// Data from different partition key ranges may be interleaved, as long as each partition key range's pages are in order. + /// + /// The pipeline will use the [`QueryResult::request_index`] field to validate this. + /// + /// When providing data from a draining request (i.e. a request with `drain = true`), all pages for that draining request can share the same [`QueryResult::request_index`]. + fn provide_data(&mut self, data: Vec) -> azure_core::Result<()>; } /// Provides an interface to a query engine, which constructs query pipelines. diff --git a/sdk/cosmos/azure_data_cosmos/src/query/executor.rs b/sdk/cosmos/azure_data_cosmos/src/query/executor.rs index e2ed9908db..48349c5a2f 100644 --- a/sdk/cosmos/azure_data_cosmos/src/query/executor.rs +++ b/sdk/cosmos/azure_data_cosmos/src/query/executor.rs @@ -128,7 +128,8 @@ impl QueryExecutor { } // No items, so make any requests we need to make and provide them to the pipeline. - for request in results.requests { + // TODO: We can absolutely parallelize these requests. + for mut request in results.requests { let mut query_request = if let Some(query) = request.query { let mut query = Query::from(query); if request.include_parameters { @@ -146,30 +147,42 @@ impl QueryExecutor { constants::PARTITION_KEY_RANGE_ID, request.partition_key_range_id.clone(), ); - if let Some(continuation) = request.continuation { - query_request.insert_header(constants::CONTINUATION, continuation); - } - let resp = self - .http_pipeline - .send_raw( - self.context.to_borrowed(), - &mut query_request, - self.items_link.clone(), - ) - .await?; - - let next_continuation = - resp.headers().get_optional_string(&constants::CONTINUATION); - let body = resp.into_body(); - - let result = QueryResult { - partition_key_range_id: &request.partition_key_range_id, - next_continuation, - result: &body, - }; + let mut draining = true; + while draining { + if let Some(c) = request.continuation.clone() { + query_request.insert_header(constants::CONTINUATION, c); + } else { + // Make sure we don't send a continuation header if we don't have one, even if we did on a previous iteration. + query_request.headers_mut().remove(constants::CONTINUATION); + } - pipeline.provide_data(result)?; + let resp = self + .http_pipeline + .send_raw( + self.context.to_borrowed(), + &mut query_request, + self.items_link.clone(), + ) + .await?; + + let next_continuation = + resp.headers().get_optional_string(&constants::CONTINUATION); + + draining = request.drain && next_continuation.is_some(); + + let body = resp.into_body(); + let result = QueryResult { + partition_key_range_id: &request.partition_key_range_id, + request_index: request.index, + next_continuation, + result: &body, + }; + + // For now, just provide a single result at a time. + // When we parallelize requests, we can more easily provide multiple results at once. + pipeline.provide_data(vec![result])?; + } } // No items, but we provided more data (probably), so continue the loop. diff --git a/sdk/cosmos/azure_data_cosmos/tests/framework/query_engine.rs b/sdk/cosmos/azure_data_cosmos/tests/framework/query_engine.rs index 37771eee3f..7c34f7c801 100644 --- a/sdk/cosmos/azure_data_cosmos/tests/framework/query_engine.rs +++ b/sdk/cosmos/azure_data_cosmos/tests/framework/query_engine.rs @@ -114,6 +114,7 @@ struct PartitionState { started: bool, queue: VecDeque, next_continuation: Option, + next_index: usize, } impl PartitionState { @@ -158,6 +159,7 @@ impl MockQueryPipeline { started: false, queue: VecDeque::new(), next_continuation: None, + next_index: 0, }) .collect(); @@ -170,25 +172,26 @@ impl MockQueryPipeline { } fn get_requests(&self) -> Vec { + let (query, include_parameters) = if let Some(config) = &self.query_request_config { + (config.query.clone(), config.include_parameters) + } else { + (None, false) + }; + self.partitions .iter() .filter(|state| !state.exhausted()) - .map(|state| azure_data_cosmos::query::QueryRequest { + .map(move |state| azure_data_cosmos::query::QueryRequest { partition_key_range_id: state.range.id.clone(), + index: state.next_index, continuation: if state.started { state.next_continuation.clone() } else { None }, - query: self - .query_request_config - .as_ref() - .and_then(|config| config.query.clone()), - include_parameters: self - .query_request_config - .as_ref() - .map(|config| config.include_parameters) - .unwrap_or(false), + query: query.clone(), + include_parameters, + drain: false, }) .collect() } @@ -266,31 +269,43 @@ impl QueryPipeline for MockQueryPipeline { fn provide_data( &mut self, - data: azure_data_cosmos::query::QueryResult, + data: std::vec::Vec>, ) -> azure_core::Result<()> { - let payload: DocumentPayload = - serde_json::from_slice(data.result).map_err(|_| { - azure_core::Error::with_message( + for data in data { + let payload: DocumentPayload = + serde_json::from_slice(data.result).map_err(|_| { + azure_core::Error::with_message( + azure_core::error::ErrorKind::Other, + "Failed to deserialize payload", + ) + })?; + + let partition_state = self + .partitions + .iter_mut() + .find(|state| state.range.id == data.partition_key_range_id); + if let Some(partition_state) = partition_state { + if partition_state.next_index != data.request_index { + return Err(azure_core::Error::with_message( + azure_core::error::ErrorKind::Other, + format!( + "Out of order data provided for partition key range {}: expected index {}, got {}", + data.partition_key_range_id, partition_state.next_index, data.request_index + ), + )); + } + partition_state.next_index += 1; + partition_state.provide_data(payload.documents, data.next_continuation); + } else { + return Err(azure_core::Error::with_message( azure_core::error::ErrorKind::Other, - "Failed to deserialize payload", - ) - })?; - - let partition_state = self - .partitions - .iter_mut() - .find(|state| state.range.id == data.partition_key_range_id); - if let Some(partition_state) = partition_state { - partition_state.provide_data(payload.documents, data.next_continuation); - Ok(()) - } else { - Err(azure_core::Error::with_message( - azure_core::error::ErrorKind::Other, - format!( - "Partition key range {} not found", - data.partition_key_range_id - ), - )) + format!( + "Partition key range {} not found", + data.partition_key_range_id + ), + )); + } } + Ok(()) } } From a677361a7366305c874ff44ff3eb7e686b81dd73 Mon Sep 17 00:00:00 2001 From: Ashley Stanton-Nurse Date: Thu, 9 Oct 2025 20:53:31 +0000 Subject: [PATCH 5/7] only add with_parameters_from when in preview_query_engine feature --- sdk/cosmos/azure_data_cosmos/src/query/executor.rs | 2 +- sdk/cosmos/azure_data_cosmos/src/query/mod.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure_data_cosmos/src/query/executor.rs b/sdk/cosmos/azure_data_cosmos/src/query/executor.rs index 48349c5a2f..d38a70d447 100644 --- a/sdk/cosmos/azure_data_cosmos/src/query/executor.rs +++ b/sdk/cosmos/azure_data_cosmos/src/query/executor.rs @@ -129,7 +129,7 @@ impl QueryExecutor { // No items, so make any requests we need to make and provide them to the pipeline. // TODO: We can absolutely parallelize these requests. - for mut request in results.requests { + for request in results.requests { let mut query_request = if let Some(query) = request.query { let mut query = Query::from(query); if request.include_parameters { diff --git a/sdk/cosmos/azure_data_cosmos/src/query/mod.rs b/sdk/cosmos/azure_data_cosmos/src/query/mod.rs index f6ccfb0e27..b2853e5008 100644 --- a/sdk/cosmos/azure_data_cosmos/src/query/mod.rs +++ b/sdk/cosmos/azure_data_cosmos/src/query/mod.rs @@ -111,6 +111,7 @@ impl Query { /// Replaces all parameters in this [`Query`] instance with the parameters from another [`Query`] instance, and returns it. /// /// Since the parameters in the other query are already serialized, this method cannot fail. + #[cfg(feature = "preview_query_engine")] pub(crate) fn with_parameters_from(mut self, other: &Query) -> Self { self.parameters = other.parameters.clone(); self From 905f26560ce3d598289ce01e6c0b9e46a8e38b3b Mon Sep 17 00:00:00 2001 From: Ashley Stanton-Nurse Date: Fri, 10 Oct 2025 17:12:20 +0000 Subject: [PATCH 6/7] pr feedback and test fix --- sdk/cosmos/azure_data_cosmos/src/query/executor.rs | 6 +++--- sdk/cosmos/azure_data_cosmos/src/query/mod.rs | 3 ++- .../azure_data_cosmos/tests/framework/query_engine.rs | 7 ++----- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/sdk/cosmos/azure_data_cosmos/src/query/executor.rs b/sdk/cosmos/azure_data_cosmos/src/query/executor.rs index d38a70d447..ce3d95fbaf 100644 --- a/sdk/cosmos/azure_data_cosmos/src/query/executor.rs +++ b/sdk/cosmos/azure_data_cosmos/src/query/executor.rs @@ -148,8 +148,8 @@ impl QueryExecutor { request.partition_key_range_id.clone(), ); - let mut draining = true; - while draining { + let mut fetch_more_pages = true; + while fetch_more_pages { if let Some(c) = request.continuation.clone() { query_request.insert_header(constants::CONTINUATION, c); } else { @@ -169,7 +169,7 @@ impl QueryExecutor { let next_continuation = resp.headers().get_optional_string(&constants::CONTINUATION); - draining = request.drain && next_continuation.is_some(); + fetch_more_pages = request.drain && next_continuation.is_some(); let body = resp.into_body(); let result = QueryResult { diff --git a/sdk/cosmos/azure_data_cosmos/src/query/mod.rs b/sdk/cosmos/azure_data_cosmos/src/query/mod.rs index b2853e5008..51909f05ae 100644 --- a/sdk/cosmos/azure_data_cosmos/src/query/mod.rs +++ b/sdk/cosmos/azure_data_cosmos/src/query/mod.rs @@ -111,7 +111,7 @@ impl Query { /// Replaces all parameters in this [`Query`] instance with the parameters from another [`Query`] instance, and returns it. /// /// Since the parameters in the other query are already serialized, this method cannot fail. - #[cfg(feature = "preview_query_engine")] + #[cfg(feature = "preview_query_engine")] // Crate-private for now, and thus only in the preview_query_engine feature (which is the only place it's used). pub(crate) fn with_parameters_from(mut self, other: &Query) -> Self { self.parameters = other.parameters.clone(); self @@ -259,6 +259,7 @@ mod tests { } #[test] + #[cfg(feature = "preview_query_engine")] pub fn with_parameters_from_replaces_all_parameters() -> Result<(), Box> { let source_query = Query::from("SELECT * FROM c") .with_parameter("@id", 42)? diff --git a/sdk/cosmos/azure_data_cosmos/tests/framework/query_engine.rs b/sdk/cosmos/azure_data_cosmos/tests/framework/query_engine.rs index 7c34f7c801..1a9d92534f 100644 --- a/sdk/cosmos/azure_data_cosmos/tests/framework/query_engine.rs +++ b/sdk/cosmos/azure_data_cosmos/tests/framework/query_engine.rs @@ -5,7 +5,7 @@ use std::{collections::VecDeque, sync::Mutex}; use serde::{Deserialize, Serialize}; -use azure_data_cosmos::query::{PipelineResult, QueryEngine, QueryPipeline}; +use azure_data_cosmos::query::{PipelineResult, QueryEngine, QueryPipeline, QueryResult}; use serde_json::value::RawValue; #[derive(Deserialize)] @@ -267,10 +267,7 @@ impl QueryPipeline for MockQueryPipeline { }) } - fn provide_data( - &mut self, - data: std::vec::Vec>, - ) -> azure_core::Result<()> { + fn provide_data(&mut self, data: Vec>) -> azure_core::Result<()> { for data in data { let payload: DocumentPayload = serde_json::from_slice(data.result).map_err(|_| { From 85130453b9f4eb213da2b2a8c5378e47ef6f6317 Mon Sep 17 00:00:00 2001 From: Ashley Stanton-Nurse Date: Mon, 27 Oct 2025 17:41:05 +0000 Subject: [PATCH 7/7] update changelog --- sdk/cosmos/azure_data_cosmos/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/cosmos/azure_data_cosmos/CHANGELOG.md b/sdk/cosmos/azure_data_cosmos/CHANGELOG.md index c4ff3f5940..9583c34afb 100644 --- a/sdk/cosmos/azure_data_cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure_data_cosmos/CHANGELOG.md @@ -4,6 +4,8 @@ ### Features Added +- Adjusted the query engine abstraction to support future enhancements and optimizations. ([#3166](https://github.com/Azure/azure-sdk-for-rust/pull/3166)) + ### Breaking Changes ### Bugs Fixed