Skip to content

feat(arrow): Allow record batches output from read_sql #819

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

chitralverma
Copy link

@chitralverma chitralverma commented Jun 20, 2025

Changes

  • Builds on the existing new_record_batch_iter to expose a pyarrow RecordBatchReader on python side
  • Supports completely lazy iterations over arrow stream destination
  • Added kwargs to read_sql, users can pass record_batch_size to control the number of records in each record batch.
  • fixed a few unwraps causing issues
  • Updates RecordBatchReader trait to support Send (helps offload RecordBatchReader to multi-threaded consumers like DuckDB)
  • Left existing implementations as is, ideally those can also rely on record batch approach

Usage/ Example

import connectorx as cx

conn = "mysql://username:password@server:port/database/"
query = "SELECT * FROM employees"

rb_iter = cx.read_sql(
    conn,
    query,
    return_type="arrow_record_batches",
    record_batch_size=120333,
)

closes #278

pub fn to_ptrs<'py>(&self, py: Python<'py>) -> Bound<'py, PyAny> {
let ptrs = py.allow_threads(
|| -> Result<(Vec<String>, Vec<Vec<(uintptr_t, uintptr_t)>>), ConnectorXPythonError> {
let rbs = vec![self.0.clone()];
Copy link
Author

@chitralverma chitralverma Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this okay or do you suggest any workarounds?

# doesn't work without `.clone()`, breaks with the following 

cannot move out of `self` which is behind a shared reference
move occurs because `self.0` has type `arrow::array::RecordBatch`, which does not implement the `Copy` trait

else:
raise ValueError(return_type)

return df


def reconstruct_arrow_rb(results) -> pa.RecordBatchReader:
Copy link
Author

@chitralverma chitralverma Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

returns a pyarrow RecordBatchReader instead of an iterator/ generator of RecordBatch. I guess this will be useful for users who want to get the pyarrow Schema since RecordBatchReader has it.

@chitralverma
Copy link
Author

@wangxiaoying for your review.
If this seems ok, I'll update the PR with documentation/ examples and such.

@chitralverma chitralverma changed the title Allow record batches output from read_sql feat(arrow): Allow record batches output from read_sql Jun 20, 2025
@wangxiaoying
Copy link
Contributor

Thanks @chitralverma for the PR! I will take a look at it by the end of this week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Provide interface stream out arrow RecordBatch
2 participants