Skip to content

Commit 1fa13e9

Browse files
authored
feat: add ttc member (tcp testing container) (#554)
* feat: add ttc member (tcp testing container) * chore(cli): fix * chore(cli): fix * chore(cli): fix * update * update * update * update * update * update * update
1 parent 53417f0 commit 1fa13e9

File tree

13 files changed

+530
-12
lines changed

13 files changed

+530
-12
lines changed

.dockerignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
target/
2+
frontend
3+
ttc/Dockerfile

.github/workflows/ttc.yml

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# .github/workflows/docker-publish.yml
2+
3+
name: Docker
4+
5+
on:
6+
push:
7+
branches:
8+
- main
9+
paths:
10+
- 'core/**'
11+
- 'driver/**'
12+
- 'ttc/**'
13+
14+
jobs:
15+
docker:
16+
runs-on: ubuntu-latest
17+
18+
steps:
19+
- name: Checkout code
20+
uses: actions/checkout@v2
21+
22+
- name: Set up Docker Buildx
23+
uses: docker/setup-buildx-action@v1
24+
25+
- name: Login to DockerHub
26+
uses: docker/login-action@v3
27+
with:
28+
username: ${{ secrets.DOCKERHUB_USERNAME }}
29+
password: ${{ secrets.DOCKERHUB_TOKEN }}
30+
31+
- name: Build and push
32+
uses: docker/build-push-action@v2
33+
with:
34+
push: true
35+
file: ./ttc/Dockerfile
36+
tags: datafuselabs/ttc-rust:latest

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ members = [
88
"cli",
99
"bindings/python",
1010
"bindings/nodejs",
11+
"ttc",
1112
]
1213
resolver = "2"
1314

driver/src/conn.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::path::Path;
1717
use std::sync::Arc;
1818

1919
use async_trait::async_trait;
20+
use databend_driver_core::raw_rows::{RawRow, RawRowIterator};
2021
use once_cell::sync::Lazy;
2122
use tokio::io::AsyncRead;
2223
use tokio_stream::StreamExt;
@@ -121,6 +122,19 @@ pub trait Connection: Send + Sync {
121122
rows.collect().await
122123
}
123124

125+
// raw data response query, only for test
126+
async fn query_raw_iter(&self, _sql: &str) -> Result<RawRowIterator> {
127+
Err(Error::BadArgument(
128+
"Unsupported implement query_raw_iter".to_string(),
129+
))
130+
}
131+
132+
// raw data response query, only for test
133+
async fn query_raw_all(&self, sql: &str) -> Result<Vec<RawRow>> {
134+
let rows = self.query_raw_iter(sql).await?;
135+
rows.collect().await
136+
}
137+
124138
/// Get presigned url for a given operation and stage location.
125139
/// The operation can be "UPLOAD" or "DOWNLOAD".
126140
async fn get_presigned_url(&self, operation: &str, stage: &str) -> Result<PresignedResponse>;

driver/src/rest_api.rs

Lines changed: 51 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@
1515
use std::collections::{BTreeMap, VecDeque};
1616
use std::future::Future;
1717
use std::io::Cursor;
18+
use std::marker::PhantomData;
1819
use std::path::Path;
1920
use std::pin::Pin;
2021
use std::sync::Arc;
2122
use std::task::{Context, Poll};
2223

2324
use async_compression::tokio::write::ZstdEncoder;
2425
use async_trait::async_trait;
26+
use databend_driver_core::raw_rows::{RawRow, RawRowIterator, RawRowWithStats};
2527
use log::info;
2628
use tokio::fs::File;
2729
use tokio::io::{AsyncReadExt, AsyncWriteExt};
@@ -83,10 +85,20 @@ impl Connection for RestAPIConnection {
8385
info!("query iter ext: {}", sql);
8486
let resp = self.client.start_query(sql).await?;
8587
let resp = self.wait_for_schema(resp).await?;
86-
let (schema, rows) = RestAPIRows::from_response(self.client.clone(), resp)?;
88+
let (schema, rows) = RestAPIRows::<RowWithStats>::from_response(self.client.clone(), resp)?;
8789
Ok(RowStatsIterator::new(Arc::new(schema), Box::pin(rows)))
8890
}
8991

92+
// raw data response query, only for test
93+
async fn query_raw_iter(&self, sql: &str) -> Result<RawRowIterator> {
94+
info!("query raw iter: {}", sql);
95+
let resp = self.client.start_query(sql).await?;
96+
let resp = self.wait_for_schema(resp).await?;
97+
let (schema, rows) =
98+
RestAPIRows::<RawRowWithStats>::from_response(self.client.clone(), resp)?;
99+
Ok(RawRowIterator::new(Arc::new(schema), Box::pin(rows)))
100+
}
101+
90102
async fn get_presigned_url(&self, operation: &str, stage: &str) -> Result<PresignedResponse> {
91103
info!("get presigned url: {} {}", operation, stage);
92104
let sql = format!("PRESIGN {} {}", operation, stage);
@@ -254,7 +266,7 @@ impl<'o> RestAPIConnection {
254266

255267
type PageFut = Pin<Box<dyn Future<Output = Result<QueryResponse>> + Send>>;
256268

257-
pub struct RestAPIRows {
269+
pub struct RestAPIRows<T> {
258270
client: Arc<APIClient>,
259271
schema: SchemaRef,
260272
data: VecDeque<Vec<Option<String>>>,
@@ -263,9 +275,10 @@ pub struct RestAPIRows {
263275
node_id: Option<String>,
264276
next_uri: Option<String>,
265277
next_page: Option<PageFut>,
278+
_phantom: std::marker::PhantomData<T>,
266279
}
267280

268-
impl RestAPIRows {
281+
impl<T> RestAPIRows<T> {
269282
fn from_response(client: Arc<APIClient>, resp: QueryResponse) -> Result<(Schema, Self)> {
270283
let schema: Schema = resp.schema.try_into()?;
271284
let rows = Self {
@@ -277,24 +290,25 @@ impl RestAPIRows {
277290
data: resp.data.into(),
278291
stats: Some(ServerStats::from(resp.stats)),
279292
next_page: None,
293+
_phantom: PhantomData,
280294
};
281295
Ok((schema, rows))
282296
}
283297
}
284298

285-
impl Stream for RestAPIRows {
286-
type Item = Result<RowWithStats>;
299+
impl<T: FromRowStats + std::marker::Unpin> Stream for RestAPIRows<T> {
300+
type Item = Result<T>;
287301

288302
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
289303
if let Some(ss) = self.stats.take() {
290-
return Poll::Ready(Some(Ok(RowWithStats::Stats(ss))));
304+
return Poll::Ready(Some(Ok(T::from_stats(ss))));
291305
}
292306
// Skip to fetch next page if there is only one row left in buffer.
293307
// Therefore we could guarantee the `/final` called before the last row.
294308
if self.data.len() > 1 {
295309
if let Some(row) = self.data.pop_front() {
296-
let row = Row::try_from((self.schema.clone(), row))?;
297-
return Poll::Ready(Some(Ok(RowWithStats::Row(row))));
310+
let row = T::try_from_row(row, self.schema.clone())?;
311+
return Poll::Ready(Some(Ok(row)));
298312
}
299313
}
300314
match self.next_page {
@@ -307,8 +321,7 @@ impl Stream for RestAPIRows {
307321
self.next_page = None;
308322
let mut new_data = resp.data.into();
309323
self.data.append(&mut new_data);
310-
let stats = ServerStats::from(resp.stats);
311-
Poll::Ready(Some(Ok(RowWithStats::Stats(stats))))
324+
Poll::Ready(Some(Ok(T::from_stats(resp.stats.into()))))
312325
}
313326
Poll::Ready(Err(e)) => {
314327
self.next_page = None;
@@ -332,12 +345,38 @@ impl Stream for RestAPIRows {
332345
}
333346
None => match self.data.pop_front() {
334347
Some(row) => {
335-
let row = Row::try_from((self.schema.clone(), row))?;
336-
Poll::Ready(Some(Ok(RowWithStats::Row(row))))
348+
let row = T::try_from_row(row, self.schema.clone())?;
349+
Poll::Ready(Some(Ok(row)))
337350
}
338351
None => Poll::Ready(None),
339352
},
340353
},
341354
}
342355
}
343356
}
357+
358+
trait FromRowStats: Send + Sync + Clone {
359+
fn from_stats(stats: ServerStats) -> Self;
360+
fn try_from_row(row: Vec<Option<String>>, schema: SchemaRef) -> Result<Self>;
361+
}
362+
363+
impl FromRowStats for RowWithStats {
364+
fn from_stats(stats: ServerStats) -> Self {
365+
RowWithStats::Stats(stats)
366+
}
367+
368+
fn try_from_row(row: Vec<Option<String>>, schema: SchemaRef) -> Result<Self> {
369+
Ok(RowWithStats::Row(Row::try_from((schema, row))?))
370+
}
371+
}
372+
373+
impl FromRowStats for RawRowWithStats {
374+
fn from_stats(stats: ServerStats) -> Self {
375+
RawRowWithStats::Stats(stats)
376+
}
377+
378+
fn try_from_row(row: Vec<Option<String>>, schema: SchemaRef) -> Result<Self> {
379+
let rows = Row::try_from((schema, row.clone()))?;
380+
Ok(RawRowWithStats::Row(RawRow::new(rows, row)))
381+
}
382+
}

licenserc.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ excludes = [
1414
".prettierignore",
1515

1616
"LICENSE",
17+
"Dockerfile",
1718
"Makefile",
1819

1920
# docs and generated files

sql/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
mod cursor_ext;
1616
pub mod error;
17+
pub mod raw_rows;
1718
pub mod rows;
1819
pub mod schema;
1920
pub mod value;

sql/src/raw_rows.rs

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::pin::Pin;
16+
use std::task::Context;
17+
use std::task::Poll;
18+
19+
use tokio_stream::{Stream, StreamExt};
20+
21+
use crate::error::Error;
22+
use crate::error::Result;
23+
use crate::rows::Row;
24+
use crate::rows::ServerStats;
25+
use crate::schema::SchemaRef;
26+
use crate::value::Value;
27+
28+
#[derive(Clone, Debug)]
29+
pub enum RawRowWithStats {
30+
Row(RawRow),
31+
Stats(ServerStats),
32+
}
33+
34+
#[derive(Clone, Debug, Default)]
35+
pub struct RawRow {
36+
pub row: Row,
37+
pub raw_row: Vec<Option<String>>,
38+
}
39+
40+
impl RawRow {
41+
pub fn new(row: Row, raw_row: Vec<Option<String>>) -> Self {
42+
Self { row, raw_row }
43+
}
44+
45+
pub fn len(&self) -> usize {
46+
self.raw_row.len()
47+
}
48+
49+
pub fn is_empty(&self) -> bool {
50+
self.raw_row.is_empty()
51+
}
52+
53+
pub fn values(&self) -> &[Option<String>] {
54+
&self.raw_row
55+
}
56+
57+
pub fn schema(&self) -> SchemaRef {
58+
self.row.schema()
59+
}
60+
}
61+
62+
impl TryFrom<(SchemaRef, Vec<Option<String>>)> for RawRow {
63+
type Error = Error;
64+
65+
fn try_from((schema, data): (SchemaRef, Vec<Option<String>>)) -> Result<Self> {
66+
let mut values: Vec<Value> = Vec::new();
67+
for (i, field) in schema.fields().iter().enumerate() {
68+
let val: Option<&str> = data.get(i).and_then(|v| v.as_deref());
69+
values.push(Value::try_from((&field.data_type, val))?);
70+
}
71+
72+
let row = Row::new(schema, values);
73+
Ok(RawRow::new(row, data))
74+
}
75+
}
76+
77+
impl IntoIterator for RawRow {
78+
type Item = Option<String>;
79+
type IntoIter = std::vec::IntoIter<Self::Item>;
80+
81+
fn into_iter(self) -> Self::IntoIter {
82+
self.raw_row.into_iter()
83+
}
84+
}
85+
86+
#[derive(Clone, Debug)]
87+
pub struct RawRows {
88+
rows: Vec<RawRow>,
89+
}
90+
91+
impl RawRows {
92+
pub fn new(rows: Vec<RawRow>) -> Self {
93+
Self { rows }
94+
}
95+
96+
pub fn rows(&self) -> &[RawRow] {
97+
&self.rows
98+
}
99+
100+
pub fn len(&self) -> usize {
101+
self.rows.len()
102+
}
103+
104+
pub fn is_empty(&self) -> bool {
105+
self.rows.is_empty()
106+
}
107+
}
108+
109+
impl IntoIterator for RawRows {
110+
type Item = RawRow;
111+
type IntoIter = std::vec::IntoIter<Self::Item>;
112+
113+
fn into_iter(self) -> Self::IntoIter {
114+
self.rows.into_iter()
115+
}
116+
}
117+
118+
pub struct RawRowIterator {
119+
schema: SchemaRef,
120+
it: Pin<Box<dyn Stream<Item = Result<RawRow>> + Send>>,
121+
}
122+
123+
impl RawRowIterator {
124+
pub fn new(
125+
schema: SchemaRef,
126+
it: Pin<Box<dyn Stream<Item = Result<RawRowWithStats>> + Send>>,
127+
) -> Self {
128+
let it = it.filter_map(|r| match r {
129+
Ok(RawRowWithStats::Row(r)) => Some(Ok(r)),
130+
Ok(_) => None,
131+
Err(err) => Some(Err(err)),
132+
});
133+
Self {
134+
schema,
135+
it: Box::pin(it),
136+
}
137+
}
138+
139+
pub fn schema(&self) -> SchemaRef {
140+
self.schema.clone()
141+
}
142+
}
143+
144+
impl Stream for RawRowIterator {
145+
type Item = Result<RawRow>;
146+
147+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
148+
Pin::new(&mut self.it).poll_next(cx)
149+
}
150+
}

0 commit comments

Comments
 (0)