Skip to content

Upsert merge strategy for iceberg #2671

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: devel
Choose a base branch
from

Conversation

anuunchin
Copy link
Contributor

@anuunchin anuunchin commented May 22, 2025

Description

This PR enables the "upsert" merge write disposition for Iceberg. The functionality is similar to how delta works. Underneath, Iceberg's upsert table is used.

Related Issues

Additional Context (for failing tests)

  1. The test test_merge_on_keys_in_schema_nested_hints would likely fail with maximum recursion error, as Iceberg's upsert has a limitation mentioned here. This is apparently resolved in pyiceberg 0.9.1, but I'm still getting a sigill error with 0.9.1 and maximum recursion error with 0.9.0, the reason for which I'm unsure about.

    • Possible solution: The issue can be addressed with batching the tables with size 1000 each.
  2. The test test_table_format_schema_evolution is failing with Data type struct<a: int64> is not supported in join non-key field json because iceberg's upsert uses arrow's hash join which doesn't handle nested types. The issue was addressed with this PR in pyiceberg 0.9.1, but with 0.9.1 I get a different error saying Unexpected physical type FIXED_LEN_BYTE_ARRAY for decimal(6, 4), expected INT32. The latter issue was also addressed in this PR in pyiceberg, but it's not yet released. Therefore, the issue should go away with pyiceberg's next release. (I tested with the newest code and it works 👀 )

    • Possible solution: We wait for the next pyiceberg release and upgrade to the latest version.
  3. The tests test_pipeline_load_parquet and test_table_format_child_tables fail because iceberg's upsert strictly forbids duplicates in input data as well as target table - if the key columns are not unique inside the source it raises an error. Delta doesn't even seem to be working properly 👀 .

    • Possible solution: We keep it explicit in the docs and leave it to the user to follow the uniqueness requirement + adjust the failing tests.

Copy link

netlify bot commented May 22, 2025

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit b8f57be
🔍 Latest deploy log https://app.netlify.com/projects/dlt-hub-docs/deploys/68415425965d740008263ca6

@anuunchin anuunchin self-assigned this May 22, 2025
@anuunchin anuunchin requested a review from rudolfix May 22, 2025 15:29
Copy link
Collaborator

@sh-rp sh-rp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this! About your questions:

  1. You can use the suggested code and document the behavior. I would also update to 0.9.1, try to isolate the sigill in a very small code block (I get it too) and open an issue in pyicerg, they seem to have some kind of bug.
  2. If you update to pyiceberg 0.9.1, you'll see the tests for append and replace also start failing, so they changed something there is unrelated to merging. I would also try to isolate this and see if you can figure out what provokes this error and wether this is a problem on our part or theirs.
  3. Yes, please document (you already did) and fix the tests accordingly. What happens in delta? Do we need a code or docs update there?

with table.update_schema() as update:
update.union_by_name(ensure_iceberg_compatible_arrow_schema(data.schema))

if "parent" in schema:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this child table loading strategy. I know you took it from delta, but it seems to me that the first unique column will be the _dlt_id which will always be new since it is generated in the normalized step and thus the merge condition is never met and we could just append. But maybe let's leave it like this for now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upsert is using deterministic row_key that is computed from primary_key if the root table

@anuunchin anuunchin force-pushed the feat/2549-upsert-filesystem-iceberg branch 2 times, most recently from 8cd57d7 to 93cbfee Compare May 30, 2025 13:44
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! we should document batching and use the PyPI version of pyiceberg

with table.update_schema() as update:
update.union_by_name(ensure_iceberg_compatible_arrow_schema(data.schema))

if "parent" in schema:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upsert is using deterministic row_key that is computed from primary_key if the root table

@anuunchin anuunchin force-pushed the feat/2549-upsert-filesystem-iceberg branch 2 times, most recently from 811ed93 to 7dfec68 Compare June 4, 2025 12:19
rudolfix
rudolfix previously approved these changes Jun 4, 2025
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! but you need to solve the conflict and merge devel...

@anuunchin anuunchin force-pushed the feat/2549-upsert-filesystem-iceberg branch from 7dfec68 to c17d230 Compare June 5, 2025 07:31
@anuunchin anuunchin force-pushed the feat/2549-upsert-filesystem-iceberg branch from c17d230 to b8f57be Compare June 5, 2025 08:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

enable upsert for filesystem / iceberg destination
3 participants