Skip to content

Commit 1a7b2d7

Browse files
committed
feat: add DataFrame.to_pandas_batches() to download large DataFrame objects
1 parent 33a9d9f commit 1a7b2d7

File tree

3 files changed

+34
-4
lines changed

3 files changed

+34
-4
lines changed

bigframes/core/blocks.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,27 @@ def to_pandas(
412412
)
413413
return df, query_job
414414

415+
def to_pandas_batches(self):
416+
"""Download results one message at a time."""
417+
dtypes = dict(zip(self.index_columns, self.index_dtypes))
418+
dtypes.update(zip(self.value_columns, self.dtypes))
419+
results_iterator, _ = self._expr.start_query()
420+
for arrow_table in results_iterator.to_arrow_iterable(
421+
bqstorage_client=self._expr._session.bqstoragereadclient
422+
):
423+
df = bigframes.session._io.pandas.arrow_to_pandas(arrow_table, dtypes)
424+
self._copy_index_to_pandas(df)
425+
yield df
426+
427+
def _copy_index_to_pandas(self, df: pd.DataFrame):
428+
"""Set the index on pandas DataFrame to match this block.
429+
430+
Warning: This method modifies ``df`` inplace.
431+
"""
432+
if self.index_columns:
433+
df.set_index(list(self.index_columns), inplace=True)
434+
df.index.names = self.index.names # type: ignore
435+
415436
def _compute_and_count(
416437
self,
417438
value_keys: Optional[Iterable[str]] = None,
@@ -485,10 +506,7 @@ def _compute_and_count(
485506
else:
486507
total_rows = results_iterator.total_rows
487508
df = self._to_dataframe(results_iterator)
488-
489-
if self.index_columns:
490-
df.set_index(list(self.index_columns), inplace=True)
491-
df.index.names = self.index.names # type: ignore
509+
self._copy_index_to_pandas(df)
492510

493511
return df, total_rows, query_job
494512

bigframes/dataframe.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -893,6 +893,10 @@ def to_pandas(
893893
self._set_internal_query_job(query_job)
894894
return df.set_axis(self._block.column_labels, axis=1, copy=False)
895895

896+
def to_pandas_batches(self) -> Iterable[pandas.DataFrame]:
897+
"""Stream DataFrame results to an iterable of pandas DataFrame"""
898+
return self._block.to_pandas_batches()
899+
896900
def _compute_dry_run(self) -> bigquery.QueryJob:
897901
return self._block._compute_dry_run()
898902

tests/system/small/test_dataframe_io.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,14 @@ def test_to_pandas_array_struct_correct_result(session):
8383
)
8484

8585

86+
def test_to_pandas_batches_w_correct_dtypes(scalars_df_default_index):
87+
"""Verify to_pandas_batches() APIs returns the expected dtypes."""
88+
expected = scalars_df_default_index.dtypes
89+
for df in scalars_df_default_index.to_pandas_batches():
90+
actual = df.dtypes
91+
pd.testing.assert_series_equal(actual, expected)
92+
93+
8694
@pytest.mark.parametrize(
8795
("index"),
8896
[True, False],

0 commit comments

Comments
 (0)