Skip to content

Commit f07f995

Browse files
committed
Pyiceberg bumped, improved error messages, batching for iceberg
1 parent 3909e58 commit f07f995

File tree

6 files changed

+143
-59
lines changed

6 files changed

+143
-59
lines changed

dlt/common/libs/deltalake.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ def merge_delta_table(
118118
table: DeltaTable,
119119
data: Union[pa.Table, pa.RecordBatchReader],
120120
schema: TTableSchema,
121+
load_table_name: str,
121122
) -> None:
122123
"""Merges in-memory Arrow data into on-disk Delta table."""
123124

@@ -149,7 +150,10 @@ def merge_delta_table(
149150

150151
qry.execute()
151152
else:
152-
raise ValueError(f'Merge strategy "{strategy}" not supported.')
153+
raise ValueError(
154+
f'Merge strategy "{strategy}" is not supported for Delta tables. '
155+
f'Table: "{load_table_name}".'
156+
)
153157

154158

155159
def get_delta_tables(

dlt/common/libs/pyiceberg.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ def merge_iceberg_table(
6868
table: IcebergTable,
6969
data: pa.Table,
7070
schema: TTableSchema,
71+
load_table_name: str,
7172
) -> None:
7273
"""Merges in-memory Arrow data into on-disk Iceberg table."""
7374
strategy = schema["x-merge-strategy"] # type: ignore[typeddict-item]
@@ -81,15 +82,22 @@ def merge_iceberg_table(
8182
else:
8283
join_cols = get_columns_names_with_prop(schema, "primary_key")
8384

84-
table.upsert(
85-
df=ensure_iceberg_compatible_arrow_data(data),
86-
join_cols=join_cols,
87-
when_matched_update_all=True,
88-
when_not_matched_insert_all=True,
89-
case_sensitive=True,
90-
)
85+
for rb in data.to_batches(max_chunksize=1_000):
86+
batch_tbl = pa.Table.from_batches([rb])
87+
batch_tbl = ensure_iceberg_compatible_arrow_data(batch_tbl)
88+
89+
table.upsert(
90+
df=batch_tbl,
91+
join_cols=join_cols,
92+
when_matched_update_all=True,
93+
when_not_matched_insert_all=True,
94+
case_sensitive=True,
95+
)
9196
else:
92-
raise ValueError(f'Merge strategy "{strategy}" not supported.')
97+
raise ValueError(
98+
f'Merge strategy "{strategy}" is not supported for Iceberg tables. '
99+
f'Table: "{load_table_name}".'
100+
)
93101

94102

95103
def get_sql_catalog(

dlt/destinations/impl/filesystem/filesystem.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ def run(self) -> None:
190190
table=delta_table,
191191
data=arrow_rbr,
192192
schema=self._load_table,
193+
load_table_name=self.load_table_name,
193194
)
194195
else:
195196
location = self._job_client.get_open_table_location("delta", self.load_table_name)
@@ -239,6 +240,7 @@ def run(self) -> None:
239240
table=table,
240241
data=self.arrow_dataset.to_table(),
241242
schema=self._load_table,
243+
load_table_name=self.load_table_name,
242244
)
243245
else:
244246
write_iceberg_table(

docs/website/docs/dlt-ecosystem/destinations/iceberg.md

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -119,13 +119,9 @@ The [S3-compatible](./filesystem.md#using-s3-compatible-storage) interface for G
119119
## Iceberg Azure scheme
120120
The `az` [scheme](./filesystem.md#supported-schemes) is not supported when using the `iceberg` table format. Please use the `abfss` scheme. This is because `pyiceberg`, which dlt used under the hood, currently does not support `az`.
121121

122-
## Table format `merge` support (**experimental**)
122+
## Table format `merge` support
123123
The [`upsert`](../../general-usage/merge-loading.md#upsert-strategy) merge strategy is supported for `iceberg`. This strategy requires that the input data contains no duplicate rows based on the key columns, and that the target table also does not contain duplicates on those keys.
124124

125-
:::caution
126-
The `upsert` merge strategy for the filesystem destination with Iceberg table format is **experimental**.
127-
:::
128-
129125
```py
130126
@dlt.resource(
131127
write_disposition={"disposition": "merge", "strategy": "upsert"},

0 commit comments

Comments
 (0)