Skip to content

Commit b22b948

Browse files
committed
rest_api support fn query_row_batch
1 parent 53e0857 commit b22b948

File tree

3 files changed

+64
-8
lines changed

3 files changed

+64
-8
lines changed

Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ members = [
88
"cli",
99
"bindings/python",
1010
"bindings/nodejs",
11-
"bindings/java",
1211
]
1312
resolver = "2"
1413

core/src/response.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
use crate::error_code::ErrorCode;
1616
use crate::session::SessionState;
17-
use serde::Deserialize;
17+
use serde::{Deserialize, Serialize};
1818

1919
#[derive(Deserialize, Debug)]
2020
pub struct QueryStats {
@@ -53,7 +53,7 @@ pub struct ProgressValues {
5353
pub bytes: usize,
5454
}
5555

56-
#[derive(Deserialize, Debug, Clone)]
56+
#[derive(Serialize, Deserialize, Debug, Clone)]
5757
pub struct SchemaField {
5858
pub name: String,
5959
#[serde(rename = "type")]

driver/src/rest_api.rs

+62-5
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ use tokio::fs::File;
2727
use tokio::io::{AsyncReadExt, AsyncWriteExt};
2828
use tokio_stream::Stream;
2929

30-
use databend_client::APIClient;
3130
use databend_client::PresignedResponse;
3231
use databend_client::QueryResponse;
32+
use databend_client::{APIClient, SchemaField};
3333
use databend_driver_core::error::{Error, Result};
3434
use databend_driver_core::rows::{Row, RowIterator, RowStatsIterator, RowWithStats, ServerStats};
3535
use databend_driver_core::schema::{Schema, SchemaRef};
@@ -82,7 +82,7 @@ impl Connection for RestAPIConnection {
8282
async fn query_iter_ext(&self, sql: &str) -> Result<RowStatsIterator> {
8383
info!("query iter ext: {}", sql);
8484
let resp = self.client.start_query(sql).await?;
85-
let resp = self.wait_for_schema(resp).await?;
85+
let resp = self.wait_for_schema(resp, true).await?;
8686
let (schema, rows) = RestAPIRows::from_response(self.client.clone(), resp)?;
8787
Ok(RowStatsIterator::new(Arc::new(schema), Box::pin(rows)))
8888
}
@@ -209,8 +209,14 @@ impl<'o> RestAPIConnection {
209209
})
210210
}
211211

212-
async fn wait_for_schema(&self, resp: QueryResponse) -> Result<QueryResponse> {
213-
if !resp.data.is_empty() || !resp.schema.is_empty() || resp.stats.progresses.has_progress()
212+
async fn wait_for_schema(
213+
&self,
214+
resp: QueryResponse,
215+
return_on_progress: bool,
216+
) -> Result<QueryResponse> {
217+
if !resp.data.is_empty()
218+
|| !resp.schema.is_empty()
219+
|| (return_on_progress && resp.stats.progresses.has_progress())
214220
{
215221
return Ok(resp);
216222
}
@@ -228,7 +234,7 @@ impl<'o> RestAPIConnection {
228234

229235
if !result.data.is_empty()
230236
|| !result.schema.is_empty()
231-
|| result.stats.progresses.has_progress()
237+
|| (return_on_progress && result.stats.progresses.has_progress())
232238
{
233239
break;
234240
}
@@ -250,6 +256,12 @@ impl<'o> RestAPIConnection {
250256
fn default_copy_options() -> BTreeMap<&'o str, &'o str> {
251257
vec![("purge", "true")].into_iter().collect()
252258
}
259+
260+
pub async fn query_row_batch(&self, sql: &str) -> Result<RowBatch> {
261+
let resp = self.client.start_query(sql).await?;
262+
let resp = self.wait_for_schema(resp, false).await?;
263+
Ok(RowBatch::from_response(self.client.clone(), resp)?)
264+
}
253265
}
254266

255267
type PageFut = Pin<Box<dyn Future<Output = Result<QueryResponse>> + Send>>;
@@ -341,3 +353,48 @@ impl Stream for RestAPIRows {
341353
}
342354
}
343355
}
356+
357+
pub struct RowBatch {
358+
schema: Vec<SchemaField>,
359+
client: Arc<APIClient>,
360+
query_id: String,
361+
node_id: Option<String>,
362+
363+
next_uri: Option<String>,
364+
data: Vec<Vec<Option<String>>>,
365+
}
366+
367+
impl RowBatch {
368+
pub fn schema(&self) -> Vec<SchemaField> {
369+
self.schema.clone()
370+
}
371+
372+
fn from_response(client: Arc<APIClient>, mut resp: QueryResponse) -> Result<Self> {
373+
Ok(Self {
374+
schema: std::mem::take(&mut resp.schema),
375+
client,
376+
query_id: resp.id,
377+
node_id: resp.node_id,
378+
next_uri: resp.next_uri,
379+
data: resp.data,
380+
})
381+
}
382+
383+
pub async fn fetch_next_page(&mut self) -> Result<Vec<Vec<Option<String>>>> {
384+
if !self.data.is_empty() {
385+
return Ok(std::mem::take(&mut self.data));
386+
}
387+
while let Some(next_uri) = &self.next_uri {
388+
let resp = self
389+
.client
390+
.query_page(&self.query_id, &next_uri, &self.node_id)
391+
.await?;
392+
393+
self.next_uri = resp.next_uri;
394+
if !resp.data.is_empty() {
395+
return Ok(resp.data);
396+
}
397+
}
398+
Ok(vec![])
399+
}
400+
}

0 commit comments

Comments
 (0)