Skip to content

Commit c17d230

Browse files
committed
Test fix for filesystem iceberg
1 parent f07f995 commit c17d230

File tree

7 files changed

+93
-137
lines changed

7 files changed

+93
-137
lines changed

dlt/common/libs/pyiceberg.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from pyiceberg.catalog import Catalog as IcebergCatalog
2727
from pyiceberg.exceptions import NoSuchTableError
2828
import pyarrow as pa
29+
import pyiceberg.io.pyarrow as _pio
2930
except ModuleNotFoundError:
3031
raise MissingDependencyException(
3132
"dlt pyiceberg helpers",
@@ -34,6 +35,20 @@
3435
)
3536

3637

38+
# TODO: remove with pyiceberg's release after 0.9.1
39+
_orig_get_kwargs = _pio._get_parquet_writer_kwargs
40+
41+
42+
def _patched_get_parquet_writer_kwargs(table_properties): # type: ignore[no-untyped-def]
43+
"""Return the original kwargs **plus** store_decimal_as_integer=True."""
44+
kwargs = _orig_get_kwargs(table_properties)
45+
kwargs.setdefault("store_decimal_as_integer", True)
46+
return kwargs
47+
48+
49+
_pio._get_parquet_writer_kwargs = _patched_get_parquet_writer_kwargs
50+
51+
3752
def ensure_iceberg_compatible_arrow_schema(schema: pa.Schema) -> pa.Schema:
3853
ARROW_TO_ICEBERG_COMPATIBLE_ARROW_TYPE_MAP = {
3954
pa.types.is_time32: pa.time64("us"),
@@ -82,6 +97,7 @@ def merge_iceberg_table(
8297
else:
8398
join_cols = get_columns_names_with_prop(schema, "primary_key")
8499

100+
# TODO: replace the batching method with transaction with pyiceberg's release after 0.9.1
85101
for rb in data.to_batches(max_chunksize=1_000):
86102
batch_tbl = pa.Table.from_batches([rb])
87103
batch_tbl = ensure_iceberg_compatible_arrow_data(batch_tbl)

docs/website/docs/dlt-ecosystem/destinations/iceberg.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,11 @@ The [S3-compatible](./filesystem.md#using-s3-compatible-storage) interface for G
120120
The `az` [scheme](./filesystem.md#supported-schemes) is not supported when using the `iceberg` table format. Please use the `abfss` scheme. This is because `pyiceberg`, which dlt used under the hood, currently does not support `az`.
121121

122122
## Table format `merge` support
123-
The [`upsert`](../../general-usage/merge-loading.md#upsert-strategy) merge strategy is supported for `iceberg`. This strategy requires that the input data contains no duplicate rows based on the key columns, and that the target table also does not contain duplicates on those keys.
123+
The [`upsert`](../../general-usage/merge-loading.md#upsert-strategy) merge strategy is supported for `iceberg`. This strategy requires that the input data contains no duplicate rows based on the key columns, and that the target table also does not contain duplicates on those keys.
124+
125+
:::warning
126+
Until _pyiceberg_ > 0.9.1 is released, upsert is executed in chunks of **1000** rows.
127+
:::
124128

125129
```py
126130
@dlt.resource(

docs/website/docs/general-usage/merge-loading.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ The `upsert` merge strategy is currently supported for these destinations:
554554
- `mssql`
555555
- `postgres`
556556
- `snowflake`
557-
- `filesystem` with `delta` table format (see limitations [here](../dlt-ecosystem/destinations/delta-iceberg#known-limitations))
557+
- `filesystem` with `delta` table format (see limitations [here](../dlt-ecosystem/destinations/delta-iceberg#known-limitations)) and `iceberg` table format
558558
:::
559559

560560
The `upsert` merge strategy does primary-key based *upserts*:

poetry.lock

Lines changed: 42 additions & 113 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,7 @@ db-dtypes = { version = ">=1.2.0", optional = true }
104104
# https://github.com/apache/airflow/issues/28723
105105
# pyiceberg = { version = ">=0.7.1", optional = true, extras = ["sql-sqlite"] }
106106
# we will rely on manual installation of `sqlalchemy>=2.0.18` instead
107-
# replace pyiceberg's version with the one released after 0.9.1
108-
pyiceberg = { git = "https://github.com/apache/iceberg-python.git", rev = "260ef54e3920d435ae3b2ccda090e66f9c1ac015", optional = true }
109-
# pyiceberg = { version = ">=0.9.1" , optional = true }
107+
pyiceberg = { version = ">=0.9.1" , optional = true }
110108

111109
databricks-sdk = {version = ">=0.38.0", optional = true}
112110
pywin32 = {version = ">=306", optional = true, platform = "win32"}

tests/load/pipeline/test_merge_disposition.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -828,11 +828,22 @@ def test_pipeline_load_parquet(destination_config: DestinationTestConfiguration)
828828
github_data.max_table_nesting = 2
829829
github_data_copy = github()
830830
github_data_copy.max_table_nesting = 2
831-
info = p.run(
832-
[github_data, github_data_copy],
833-
write_disposition="merge",
834-
**destination_config.run_kwargs,
835-
)
831+
# iceberg filesystem requires input data without duplicates
832+
if (
833+
destination_config.table_format == "iceberg"
834+
and destination_config.destination_type == "filesystem"
835+
):
836+
info = p.run(
837+
github_data,
838+
write_disposition="merge",
839+
**destination_config.run_kwargs,
840+
)
841+
else:
842+
info = p.run(
843+
[github_data, github_data_copy],
844+
write_disposition="merge",
845+
**destination_config.run_kwargs,
846+
)
836847
assert_load_info(info)
837848
# make sure it was parquet or sql transforms
838849
expected_formats = ["parquet"]
@@ -844,10 +855,9 @@ def test_pipeline_load_parquet(destination_config: DestinationTestConfiguration)
844855

845856
github_1_counts = load_table_counts(p)
846857
expected_rows = 100
847-
# if table_format is set we use upsert which does not deduplicate input data
848-
if not destination_config.supports_merge or (
849-
destination_config.table_format and destination_config.destination_type != "athena"
850-
):
858+
# if table_format is set to delta we use upsert which does not deduplicate input data
859+
# otherwise the data is either deduplicated or it's iceberg filesystem for which we didn't pass duplicates at all
860+
if destination_config.table_format == "delta":
851861
expected_rows *= 2
852862
assert github_1_counts["issues"] == expected_rows
853863

tests/load/pipeline/test_open_table_pipeline.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -375,16 +375,15 @@ def nested_table():
375375
assert len(rows_dict["nested_table__child"]) == 3
376376
assert len(rows_dict["nested_table__child__grandchild"]) == 5
377377

378-
if destination_config.supports_merge:
379-
# now drop children and grandchildren, use merge write disposition to create and pass full table chain
380-
# also for tables that do not have jobs
381-
info = pipeline.run(
382-
[{"foo": 3}] * 10000,
383-
table_name="nested_table",
384-
primary_key="foo",
385-
write_disposition="merge",
386-
)
387-
assert_load_info(info)
378+
# now drop children and grandchildren, use merge write disposition to create and pass full table chain
379+
# also for tables that do not have jobs
380+
info = pipeline.run(
381+
[{"foo": i} for i in range(3, 10003)],
382+
table_name="nested_table",
383+
primary_key="foo",
384+
write_disposition="merge",
385+
)
386+
assert_load_info(info)
388387

389388

390389
@pytest.mark.parametrize(

0 commit comments

Comments
 (0)