Skip to content

Commit 34b89ba

Browse files
Jay-jurichardliaw
authored andcommitted
[data] support new pyiceberg version (#51744)
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: jukejian <jukejian@bytedance.com> Signed-off-by: Richard Liaw <rliaw@berkeley.edu> Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
1 parent 02686eb commit 34b89ba

7 files changed

+68
-29
lines changed

python/ray/data/_internal/datasource/iceberg_datasink.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from ray.data._internal.execution.interfaces import TaskContext
1212
from ray.data.datasource.datasink import WriteResult
1313
import uuid
14+
from packaging import version
1415

1516
if TYPE_CHECKING:
1617
from pyiceberg.catalog import Catalog
@@ -85,7 +86,15 @@ def _get_catalog(self) -> "Catalog":
8586

8687
def on_write_start(self) -> None:
8788
"""Prepare for the transaction"""
88-
from pyiceberg.table import PropertyUtil, TableProperties
89+
import pyiceberg
90+
from pyiceberg.table import TableProperties
91+
92+
if version.parse(pyiceberg.__version__) >= version.parse("0.9.0"):
93+
from pyiceberg.utils.properties import property_as_bool
94+
else:
95+
from pyiceberg.table import PropertyUtil
96+
97+
property_as_bool = PropertyUtil.property_as_bool
8998

9099
catalog = self._get_catalog()
91100
table = catalog.load_table(self.table_identifier)
@@ -103,7 +112,7 @@ def on_write_start(self) -> None:
103112
f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: {unsupported_partitions}."
104113
)
105114

106-
self._manifest_merge_enabled = PropertyUtil.property_as_bool(
115+
self._manifest_merge_enabled = property_as_bool(
107116
self._table_metadata.properties,
108117
TableProperties.MANIFEST_MERGE_ENABLED,
109118
TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,

python/ray/data/_internal/datasource/iceberg_datasource.py

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
from ray.data.datasource.datasource import Datasource, ReadTask
1414
from ray.util.annotations import DeveloperAPI
1515

16+
from packaging import version
17+
import pyarrow as pa
18+
1619
if TYPE_CHECKING:
1720
from pyiceberg.catalog import Catalog
1821
from pyiceberg.expressions import BooleanExpression
@@ -34,21 +37,47 @@ def _get_read_task(
3437
limit: Optional[int],
3538
schema: "Schema",
3639
) -> Iterable[Block]:
37-
from pyiceberg.io import pyarrow as pyi_pa_io
38-
39-
# Use the PyIceberg API to read only a single task (specifically, a
40-
# FileScanTask) - note that this is not as simple as reading a single
41-
# parquet file, as there might be delete files, etc. associated, so we
42-
# must use the PyIceberg API for the projection.
43-
yield pyi_pa_io.project_table(
44-
tasks=tasks,
45-
table_metadata=table_metadata,
46-
io=table_io,
47-
row_filter=row_filter,
48-
projected_schema=schema,
49-
case_sensitive=case_sensitive,
50-
limit=limit,
51-
)
40+
# Determine the PyIceberg version to handle backward compatibility
41+
import pyiceberg
42+
43+
if version.parse(pyiceberg.__version__) >= version.parse("0.9.0"):
44+
# Modern implementation using ArrowScan (PyIceberg 0.9.0+)
45+
from pyiceberg.io.pyarrow import ArrowScan
46+
47+
# Initialize scanner with Iceberg metadata and query parameters
48+
scanner = ArrowScan(
49+
table_metadata=table_metadata,
50+
io=table_io,
51+
row_filter=row_filter,
52+
projected_schema=schema,
53+
case_sensitive=case_sensitive,
54+
limit=limit,
55+
)
56+
57+
# Convert scanned data to Arrow Table format
58+
result_table = scanner.to_table(tasks=tasks)
59+
60+
# Stream results as RecordBatches for memory efficiency
61+
for batch in result_table.to_batches():
62+
yield pa.Table.from_batches([batch])
63+
64+
else:
65+
# Legacy implementation using project_table (PyIceberg <0.9.0)
66+
from pyiceberg.io import pyarrow as pyi_pa_io
67+
68+
# Use the PyIceberg API to read only a single task (specifically, a
69+
# FileScanTask) - note that this is not as simple as reading a single
70+
# parquet file, as there might be delete files, etc. associated, so we
71+
# must use the PyIceberg API for the projection.
72+
yield pyi_pa_io.project_table(
73+
tasks=tasks,
74+
table_metadata=table_metadata,
75+
io=table_io,
76+
row_filter=row_filter,
77+
projected_schema=schema,
78+
case_sensitive=case_sensitive,
79+
limit=limit,
80+
)
5281

5382

5483
@DeveloperAPI

python/requirements/ml/data-test-requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,6 @@ deltalake
1919
pytest-mock
2020
decord
2121
snowflake-connector-python
22-
pyiceberg[sql-sqlite]==0.7.0
22+
pyiceberg[sql-sqlite]==0.9.0
2323
clickhouse-connect
2424
hudi==0.2.0

python/requirements_compiled.txt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,10 +235,11 @@ braceexpand==0.1.7
235235
# via webdataset
236236
bracex==2.4
237237
# via wcmatch
238-
cachetools==5.3.2
238+
cachetools==5.5.2
239239
# via
240240
# aim
241241
# google-auth
242+
# pyiceberg
242243
certifi==2025.1.31
243244
# via
244245
# -r python/requirements/cloud-requirements.txt
@@ -1649,7 +1650,7 @@ pygments==2.18.0
16491650
# nbconvert
16501651
# rich
16511652
# sphinx
1652-
pyiceberg==0.7.0
1653+
pyiceberg==0.9.0
16531654
# via -r python/requirements/ml/data-test-requirements.txt
16541655
pyjwt==2.8.0
16551656
# via

python/requirements_compiled_rayllm_py311_cpu.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,9 +241,9 @@ blake3==1.0.4 \
241241
# via
242242
# -c python/requirements_compiled_rayllm_test_py311_cpu.txt
243243
# vllm
244-
cachetools==5.3.2 \
245-
--hash=sha256:086ee420196f7b2ab9ca2db2520aca326318b68fe5ba8bc4d49cca91add450f2 \
246-
--hash=sha256:861f35a13a451f94e301ce2bec7cac63e881232ccce7ed67fab9b5df4d3beaa1
244+
cachetools==5.5.2 \
245+
--hash=sha256:1a661caa9175d26759571b2e19580f9d6393969e5dfca11fdb1f947a23e640d4 \
246+
--hash=sha256:d26a22bcc62eb95c3beabd9f1ee5e820d3d2704fe2967cbe350e20c8ffcd3f0a
247247
# via
248248
# -c python/requirements_compiled_rayllm_test_py311_cpu.txt
249249
# google-auth

python/requirements_compiled_rayllm_py311_cu121.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,9 +241,9 @@ blake3==1.0.4 \
241241
# via
242242
# -c python/requirements_compiled_rayllm_test_py311_cu121.txt
243243
# vllm
244-
cachetools==5.3.2 \
245-
--hash=sha256:086ee420196f7b2ab9ca2db2520aca326318b68fe5ba8bc4d49cca91add450f2 \
246-
--hash=sha256:861f35a13a451f94e301ce2bec7cac63e881232ccce7ed67fab9b5df4d3beaa1
244+
cachetools==5.5.2 \
245+
--hash=sha256:1a661caa9175d26759571b2e19580f9d6393969e5dfca11fdb1f947a23e640d4 \
246+
--hash=sha256:d26a22bcc62eb95c3beabd9f1ee5e820d3d2704fe2967cbe350e20c8ffcd3f0a
247247
# via
248248
# -c python/requirements_compiled_rayllm_test_py311_cu121.txt
249249
# google-auth

python/requirements_compiled_rayllm_py311_cu124.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,9 +241,9 @@ blake3==1.0.4 \
241241
# via
242242
# -c python/requirements_compiled_rayllm_test_py311_cu124.txt
243243
# vllm
244-
cachetools==5.3.2 \
245-
--hash=sha256:086ee420196f7b2ab9ca2db2520aca326318b68fe5ba8bc4d49cca91add450f2 \
246-
--hash=sha256:861f35a13a451f94e301ce2bec7cac63e881232ccce7ed67fab9b5df4d3beaa1
244+
cachetools==5.5.2 \
245+
--hash=sha256:1a661caa9175d26759571b2e19580f9d6393969e5dfca11fdb1f947a23e640d4 \
246+
--hash=sha256:d26a22bcc62eb95c3beabd9f1ee5e820d3d2704fe2967cbe350e20c8ffcd3f0a
247247
# via
248248
# -c python/requirements_compiled_rayllm_test_py311_cu124.txt
249249
# google-auth

0 commit comments

Comments
 (0)