Skip to content

Commit e12fdb6

Browse files
y-f-upeasee
andauthored
streaming arrow data support (#373)
* streaming arrow (#3) * streaming * remove invalid schema assign * use ArrowStream type to represent streaming * clippy * doc * import * typo * export arrow stream * fix: Missing semicolon in docs test --------- Co-authored-by: peasee <98815791+peasee@users.noreply.github.com>
1 parent 36b83bc commit e12fdb6

File tree

4 files changed

+123
-5
lines changed

4 files changed

+123
-5
lines changed

crates/duckdb/src/arrow_batch.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use super::{
33
Statement,
44
};
55

6-
/// An handle for the resulting RecordBatch of a query.
6+
/// A handle for the resulting RecordBatch of a query.
77
#[must_use = "Arrow is lazy and will do nothing unless consumed"]
88
pub struct Arrow<'stmt> {
99
pub(crate) stmt: Option<&'stmt Statement<'stmt>>,
@@ -29,3 +29,34 @@ impl<'stmt> Iterator for Arrow<'stmt> {
2929
Some(RecordBatch::from(&self.stmt?.step()?))
3030
}
3131
}
32+
33+
/// A handle for the resulting RecordBatch of a query in streaming
34+
#[must_use = "Arrow stream is lazy and will not fetch data unless consumed"]
35+
pub struct ArrowStream<'stmt> {
36+
pub(crate) stmt: Option<&'stmt Statement<'stmt>>,
37+
pub(crate) schema: SchemaRef,
38+
}
39+
40+
impl<'stmt> ArrowStream<'stmt> {
41+
#[inline]
42+
pub(crate) fn new(stmt: &'stmt Statement<'stmt>, schema: SchemaRef) -> ArrowStream<'stmt> {
43+
ArrowStream {
44+
stmt: Some(stmt),
45+
schema,
46+
}
47+
}
48+
49+
/// return arrow schema
50+
#[inline]
51+
pub fn get_schema(&self) -> SchemaRef {
52+
self.schema.clone()
53+
}
54+
}
55+
56+
impl<'stmt> Iterator for ArrowStream<'stmt> {
57+
type Item = RecordBatch;
58+
59+
fn next(&mut self) -> Option<Self::Item> {
60+
Some(RecordBatch::from(&self.stmt?.stream_step(self.get_schema())?))
61+
}
62+
}

crates/duckdb/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ pub use crate::r2d2::DuckdbConnectionManager;
7373
pub use crate::{
7474
appender::Appender,
7575
appender_params::{appender_params_from_iter, AppenderParams, AppenderParamsFromIter},
76-
arrow_batch::Arrow,
76+
arrow_batch::{Arrow, ArrowStream},
7777
cache::CachedStatement,
7878
column::Column,
7979
config::{AccessMode, Config, DefaultNullOrder, DefaultOrder},

crates/duckdb/src/raw_statement.rs

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{ffi::CStr, ptr, rc::Rc, sync::Arc};
1+
use std::{ffi::CStr, ops::Deref, ptr, rc::Rc, sync::Arc};
22

33
use arrow::{
44
array::StructArray,
@@ -9,14 +9,15 @@ use arrow::{
99
use super::{ffi, Result};
1010
#[cfg(feature = "polars")]
1111
use crate::arrow2;
12-
use crate::error::result_from_duckdb_arrow;
12+
use crate::{error::result_from_duckdb_arrow, Error};
1313

1414
// Private newtype for raw sqlite3_stmts that finalize themselves when dropped.
1515
// TODO: destroy statement and result
1616
#[derive(Debug)]
1717
pub struct RawStatement {
1818
ptr: ffi::duckdb_prepared_statement,
1919
result: Option<ffi::duckdb_arrow>,
20+
duckdb_result: Option<ffi::duckdb_result>,
2021
schema: Option<SchemaRef>,
2122
// Cached SQL (trimmed) that we use as the key when we're in the statement
2223
// cache. This is None for statements which didn't come from the statement
@@ -38,6 +39,7 @@ impl RawStatement {
3839
ptr: stmt,
3940
result: None,
4041
schema: None,
42+
duckdb_result: None,
4143
statement_cache_key: None,
4244
}
4345
}
@@ -110,6 +112,39 @@ impl RawStatement {
110112
}
111113
}
112114

115+
#[inline]
116+
pub fn streaming_step(&self, schema: SchemaRef) -> Option<StructArray> {
117+
if let Some(result) = self.duckdb_result {
118+
unsafe {
119+
let mut out = ffi::duckdb_stream_fetch_chunk(result);
120+
121+
if out.is_null() {
122+
return None;
123+
}
124+
125+
let mut arrays = FFI_ArrowArray::empty();
126+
ffi::duckdb_result_arrow_array(
127+
result,
128+
out,
129+
&mut std::ptr::addr_of_mut!(arrays) as *mut _ as *mut ffi::duckdb_arrow_array,
130+
);
131+
132+
ffi::duckdb_destroy_data_chunk(&mut out);
133+
134+
if arrays.is_empty() {
135+
return None;
136+
}
137+
138+
let schema = FFI_ArrowSchema::try_from(schema.deref()).ok()?;
139+
let array_data = from_ffi(arrays, &schema).expect("ok");
140+
let struct_array = StructArray::from(array_data);
141+
return Some(struct_array);
142+
}
143+
}
144+
145+
None
146+
}
147+
113148
#[cfg(feature = "polars")]
114149
#[inline]
115150
pub fn step2(&self) -> Option<arrow2::array::StructArray> {
@@ -242,6 +277,22 @@ impl RawStatement {
242277
}
243278
}
244279

280+
pub fn execute_streaming(&mut self) -> Result<()> {
281+
self.reset_result();
282+
unsafe {
283+
let mut out: ffi::duckdb_result = std::mem::zeroed();
284+
285+
let rc = ffi::duckdb_execute_prepared_streaming(self.ptr, &mut out);
286+
if rc != ffi::DuckDBSuccess {
287+
return Err(Error::DuckDBFailure(ffi::Error::new(rc), None));
288+
}
289+
290+
self.duckdb_result = Some(out);
291+
292+
Ok(())
293+
}
294+
}
295+
245296
#[inline]
246297
pub fn reset_result(&mut self) {
247298
self.schema = None;
@@ -251,6 +302,12 @@ impl RawStatement {
251302
}
252303
self.result = None;
253304
}
305+
if let Some(mut result) = self.duckdb_result {
306+
unsafe {
307+
ffi::duckdb_destroy_result(&mut result);
308+
}
309+
self.duckdb_result = None;
310+
}
254311
}
255312

256313
#[inline]

crates/duckdb/src/statement.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use super::{ffi, AndThenRows, Connection, Error, MappedRows, Params, RawStatemen
66
#[cfg(feature = "polars")]
77
use crate::{arrow2, polars_dataframe::Polars};
88
use crate::{
9-
arrow_batch::Arrow,
9+
arrow_batch::{Arrow, ArrowStream},
1010
error::result_from_duckdb_prepare,
1111
types::{TimeUnit, ToSql, ToSqlOutput},
1212
};
@@ -109,6 +109,30 @@ impl Statement<'_> {
109109
Ok(Arrow::new(self))
110110
}
111111

112+
/// Execute the prepared statement, returning a handle to the resulting
113+
/// vector of arrow RecordBatch in streaming way
114+
///
115+
/// ## Example
116+
///
117+
/// ```rust,no_run
118+
/// # use duckdb::{Result, Connection};
119+
/// # use arrow::record_batch::RecordBatch;
120+
/// # use arrow::datatypes::SchemaRef;
121+
/// fn get_arrow_data(conn: &Connection, schema: SchemaRef) -> Result<Vec<RecordBatch>> {
122+
/// Ok(conn.prepare("SELECT * FROM test")?.stream_arrow([], schema)?.collect())
123+
/// }
124+
/// ```
125+
///
126+
/// # Failure
127+
///
128+
/// Will return `Err` if binding parameters fails.
129+
#[inline]
130+
pub fn stream_arrow<P: Params>(&mut self, params: P, schema: SchemaRef) -> Result<ArrowStream<'_>> {
131+
params.__bind_in(self)?;
132+
self.stmt.execute_streaming()?;
133+
Ok(ArrowStream::new(self, schema))
134+
}
135+
112136
/// Execute the prepared statement, returning a handle to the resulting
113137
/// vector of polars DataFrame.
114138
///
@@ -337,6 +361,12 @@ impl Statement<'_> {
337361
self.stmt.step()
338362
}
339363

364+
/// Get next batch records in arrow-rs in a streaming way
365+
#[inline]
366+
pub fn stream_step(&self, schema: SchemaRef) -> Option<StructArray> {
367+
self.stmt.streaming_step(schema)
368+
}
369+
340370
#[cfg(feature = "polars")]
341371
/// Get next batch records in arrow2
342372
#[inline]

0 commit comments

Comments
 (0)