Skip to content

Commit 93cbfee

Browse files
committed
Test fix for filesystem iceberg
1 parent 563ffa9 commit 93cbfee

File tree

3 files changed

+32
-20
lines changed

3 files changed

+32
-20
lines changed

docs/website/docs/general-usage/merge-loading.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ The `upsert` merge strategy is currently supported for these destinations:
554554
- `mssql`
555555
- `postgres`
556556
- `snowflake`
557-
- `filesystem` with `delta` table format (see limitations [here](../dlt-ecosystem/destinations/delta-iceberg#known-limitations))
557+
- `filesystem` with `delta` table format (see limitations [here](../dlt-ecosystem/destinations/delta-iceberg#known-limitations)) and `iceberg` table format
558558
:::
559559

560560
The `upsert` merge strategy does primary-key based *upserts*:

tests/load/pipeline/test_merge_disposition.py

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -820,6 +820,9 @@ def test_merge_no_merge_keys(destination_config: DestinationTestConfiguration) -
820820
ids=lambda x: x.name,
821821
)
822822
def test_pipeline_load_parquet(destination_config: DestinationTestConfiguration) -> None:
823+
import os
824+
825+
os.environ["LOAD__WORKERS"] = "1"
823826
p = destination_config.setup_pipeline("github_3", dev_mode=True)
824827
# do not save state to destination so jobs counting is easier
825828
p.config.restore_from_destination = False
@@ -828,11 +831,22 @@ def test_pipeline_load_parquet(destination_config: DestinationTestConfiguration)
828831
github_data.max_table_nesting = 2
829832
github_data_copy = github()
830833
github_data_copy.max_table_nesting = 2
831-
info = p.run(
832-
[github_data, github_data_copy],
833-
write_disposition="merge",
834-
**destination_config.run_kwargs,
835-
)
834+
# iceberg filesystem requires input data without duplicates
835+
if (
836+
destination_config.table_format == "iceberg"
837+
and destination_config.destination_type == "filesystem"
838+
):
839+
info = p.run(
840+
github_data,
841+
write_disposition="merge",
842+
**destination_config.run_kwargs,
843+
)
844+
else:
845+
info = p.run(
846+
[github_data, github_data_copy],
847+
write_disposition="merge",
848+
**destination_config.run_kwargs,
849+
)
836850
assert_load_info(info)
837851
# make sure it was parquet or sql transforms
838852
expected_formats = ["parquet"]
@@ -844,10 +858,9 @@ def test_pipeline_load_parquet(destination_config: DestinationTestConfiguration)
844858

845859
github_1_counts = load_table_counts(p, *[t["name"] for t in p.default_schema.data_tables()])
846860
expected_rows = 100
847-
# if table_format is set we use upsert which does not deduplicate input data
848-
if not destination_config.supports_merge or (
849-
destination_config.table_format and destination_config.destination_type != "athena"
850-
):
861+
# if table_format is set to delta we use upsert which does not deduplicate input data
862+
# otherwise the data is either deduplicated or it's iceberg filesystem for which we didn't pass duplicates at all
863+
if destination_config.table_format == "delta":
851864
expected_rows *= 2
852865
assert github_1_counts["issues"] == expected_rows
853866

tests/load/pipeline/test_open_table_pipeline.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -375,16 +375,15 @@ def nested_table():
375375
assert len(rows_dict["nested_table__child"]) == 3
376376
assert len(rows_dict["nested_table__child__grandchild"]) == 5
377377

378-
if destination_config.supports_merge:
379-
# now drop children and grandchildren, use merge write disposition to create and pass full table chain
380-
# also for tables that do not have jobs
381-
info = pipeline.run(
382-
[{"foo": 3}] * 10000,
383-
table_name="nested_table",
384-
primary_key="foo",
385-
write_disposition="merge",
386-
)
387-
assert_load_info(info)
378+
# now drop children and grandchildren, use merge write disposition to create and pass full table chain
379+
# also for tables that do not have jobs
380+
info = pipeline.run(
381+
[{"foo": i} for i in range(3, 10003)],
382+
table_name="nested_table",
383+
primary_key="foo",
384+
write_disposition="merge",
385+
)
386+
assert_load_info(info)
388387

389388

390389
@pytest.mark.parametrize(

0 commit comments

Comments
 (0)