-
Notifications
You must be signed in to change notification settings - Fork 283
Upsert merge strategy for iceberg #2671
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: devel
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for dlt-hub-docs canceled.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this! About your questions:
- You can use the suggested code and document the behavior. I would also update to
0.9.1
, try to isolate the sigill in a very small code block (I get it too) and open an issue in pyicerg, they seem to have some kind of bug. - If you update to pyiceberg
0.9.1
, you'll see the tests for append and replace also start failing, so they changed something there is unrelated to merging. I would also try to isolate this and see if you can figure out what provokes this error and wether this is a problem on our part or theirs. - Yes, please document (you already did) and fix the tests accordingly. What happens in delta? Do we need a code or docs update there?
with table.update_schema() as update: | ||
update.union_by_name(ensure_iceberg_compatible_arrow_schema(data.schema)) | ||
|
||
if "parent" in schema: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about this child table loading strategy. I know you took it from delta, but it seems to me that the first unique column will be the _dlt_id
which will always be new since it is generated in the normalized step and thus the merge condition is never met and we could just append. But maybe let's leave it like this for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
upsert
is using deterministic row_key
that is computed from primary_key
if the root table
8cd57d7
to
93cbfee
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! we should document batching and use the PyPI version of pyiceberg
with table.update_schema() as update: | ||
update.union_by_name(ensure_iceberg_compatible_arrow_schema(data.schema)) | ||
|
||
if "parent" in schema: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
upsert
is using deterministic row_key
that is computed from primary_key
if the root table
811ed93
to
7dfec68
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! but you need to solve the conflict and merge devel...
7dfec68
to
c17d230
Compare
c17d230
to
b8f57be
Compare
Description
This PR enables the "upsert" merge write disposition for Iceberg. The functionality is similar to how delta works. Underneath, Iceberg's upsert table is used.
Related Issues
Additional Context (for failing tests)
The test
test_merge_on_keys_in_schema_nested_hints
would likely fail with maximum recursion error, as Iceberg's upsert has a limitation mentioned here. This is apparently resolved in pyiceberg 0.9.1, but I'm still getting a sigill error with 0.9.1 and maximum recursion error with 0.9.0, the reason for which I'm unsure about.The test
test_table_format_schema_evolution
is failing withData type struct<a: int64> is not supported in join non-key field json
because iceberg's upsert uses arrow's hash join which doesn't handle nested types. The issue was addressed with this PR in pyiceberg 0.9.1, but with 0.9.1 I get a different error sayingUnexpected physical type FIXED_LEN_BYTE_ARRAY for decimal(6, 4), expected INT32
. The latter issue was also addressed in this PR in pyiceberg, but it's not yet released. Therefore, the issue should go away with pyiceberg's next release. (I tested with the newest code and it works 👀 )The tests
test_pipeline_load_parquet
andtest_table_format_child_tables
fail because iceberg's upsert strictly forbids duplicates in input data as well as target table - if the key columns are not unique inside the source it raises an error. Delta doesn't even seem to be working properly 👀 .