Skip to content

Commit 7d49de3

Browse files
authored
feat: rest_api support fn query_row_batch (#555)
* feat: rest_api support fn query_row_batch * rm integration for py3.7
1 parent 987298f commit 7d49de3

File tree

4 files changed

+68
-10
lines changed

4 files changed

+68
-10
lines changed

.github/workflows/bindings.python.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ jobs:
108108
runs-on: ubuntu-latest
109109
strategy:
110110
matrix:
111-
pyver: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12", "3.13"]
111+
pyver: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13"]
112112
steps:
113113
- uses: actions/checkout@v4
114114
- name: Setup Python

.gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
.vscode
22
.idea
3-
target/
3+
**/target
44
Cargo.lock
55
venv/
66

@@ -9,3 +9,4 @@ venv/
99

1010
/dist
1111

12+
**/.DS_Store

core/src/response.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
use crate::error_code::ErrorCode;
1616
use crate::session::SessionState;
17-
use serde::Deserialize;
17+
use serde::{Deserialize, Serialize};
1818

1919
#[derive(Deserialize, Debug)]
2020
pub struct QueryStats {
@@ -53,7 +53,7 @@ pub struct ProgressValues {
5353
pub bytes: usize,
5454
}
5555

56-
#[derive(Deserialize, Debug, Clone)]
56+
#[derive(Serialize, Deserialize, Debug, Clone)]
5757
pub struct SchemaField {
5858
pub name: String,
5959
#[serde(rename = "type")]

driver/src/rest_api.rs

+63-6
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ use tokio::fs::File;
2929
use tokio::io::{AsyncReadExt, AsyncWriteExt};
3030
use tokio_stream::Stream;
3131

32-
use databend_client::APIClient;
3332
use databend_client::PresignedResponse;
3433
use databend_client::QueryResponse;
34+
use databend_client::{APIClient, SchemaField};
3535
use databend_driver_core::error::{Error, Result};
3636
use databend_driver_core::rows::{Row, RowIterator, RowStatsIterator, RowWithStats, ServerStats};
3737
use databend_driver_core::schema::{Schema, SchemaRef};
@@ -84,7 +84,7 @@ impl Connection for RestAPIConnection {
8484
async fn query_iter_ext(&self, sql: &str) -> Result<RowStatsIterator> {
8585
info!("query iter ext: {}", sql);
8686
let resp = self.client.start_query(sql).await?;
87-
let resp = self.wait_for_schema(resp).await?;
87+
let resp = self.wait_for_schema(resp, true).await?;
8888
let (schema, rows) = RestAPIRows::<RowWithStats>::from_response(self.client.clone(), resp)?;
8989
Ok(RowStatsIterator::new(Arc::new(schema), Box::pin(rows)))
9090
}
@@ -93,7 +93,7 @@ impl Connection for RestAPIConnection {
9393
async fn query_raw_iter(&self, sql: &str) -> Result<RawRowIterator> {
9494
info!("query raw iter: {}", sql);
9595
let resp = self.client.start_query(sql).await?;
96-
let resp = self.wait_for_schema(resp).await?;
96+
let resp = self.wait_for_schema(resp, true).await?;
9797
let (schema, rows) =
9898
RestAPIRows::<RawRowWithStats>::from_response(self.client.clone(), resp)?;
9999
Ok(RawRowIterator::new(Arc::new(schema), Box::pin(rows)))
@@ -221,8 +221,14 @@ impl<'o> RestAPIConnection {
221221
})
222222
}
223223

224-
async fn wait_for_schema(&self, resp: QueryResponse) -> Result<QueryResponse> {
225-
if !resp.data.is_empty() || !resp.schema.is_empty() || resp.stats.progresses.has_progress()
224+
async fn wait_for_schema(
225+
&self,
226+
resp: QueryResponse,
227+
return_on_progress: bool,
228+
) -> Result<QueryResponse> {
229+
if !resp.data.is_empty()
230+
|| !resp.schema.is_empty()
231+
|| (return_on_progress && resp.stats.progresses.has_progress())
226232
{
227233
return Ok(resp);
228234
}
@@ -240,7 +246,7 @@ impl<'o> RestAPIConnection {
240246

241247
if !result.data.is_empty()
242248
|| !result.schema.is_empty()
243-
|| result.stats.progresses.has_progress()
249+
|| (return_on_progress && result.stats.progresses.has_progress())
244250
{
245251
break;
246252
}
@@ -262,6 +268,12 @@ impl<'o> RestAPIConnection {
262268
fn default_copy_options() -> BTreeMap<&'o str, &'o str> {
263269
vec![("purge", "true")].into_iter().collect()
264270
}
271+
272+
pub async fn query_row_batch(&self, sql: &str) -> Result<RowBatch> {
273+
let resp = self.client.start_query(sql).await?;
274+
let resp = self.wait_for_schema(resp, false).await?;
275+
RowBatch::from_response(self.client.clone(), resp)
276+
}
265277
}
266278

267279
type PageFut = Pin<Box<dyn Future<Output = Result<QueryResponse>> + Send>>;
@@ -380,3 +392,48 @@ impl FromRowStats for RawRowWithStats {
380392
Ok(RawRowWithStats::Row(RawRow::new(rows, row)))
381393
}
382394
}
395+
396+
pub struct RowBatch {
397+
schema: Vec<SchemaField>,
398+
client: Arc<APIClient>,
399+
query_id: String,
400+
node_id: Option<String>,
401+
402+
next_uri: Option<String>,
403+
data: Vec<Vec<Option<String>>>,
404+
}
405+
406+
impl RowBatch {
407+
pub fn schema(&self) -> Vec<SchemaField> {
408+
self.schema.clone()
409+
}
410+
411+
fn from_response(client: Arc<APIClient>, mut resp: QueryResponse) -> Result<Self> {
412+
Ok(Self {
413+
schema: std::mem::take(&mut resp.schema),
414+
client,
415+
query_id: resp.id,
416+
node_id: resp.node_id,
417+
next_uri: resp.next_uri,
418+
data: resp.data,
419+
})
420+
}
421+
422+
pub async fn fetch_next_page(&mut self) -> Result<Vec<Vec<Option<String>>>> {
423+
if !self.data.is_empty() {
424+
return Ok(std::mem::take(&mut self.data));
425+
}
426+
while let Some(next_uri) = &self.next_uri {
427+
let resp = self
428+
.client
429+
.query_page(&self.query_id, next_uri, &self.node_id)
430+
.await?;
431+
432+
self.next_uri = resp.next_uri;
433+
if !resp.data.is_empty() {
434+
return Ok(resp.data);
435+
}
436+
}
437+
Ok(vec![])
438+
}
439+
}

0 commit comments

Comments
 (0)