Skip to content

Commit 6882899

Browse files
committed
chore: make streamed_exec default
Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com>
1 parent 0fa78a6 commit 6882899

File tree

3 files changed

+9
-3
lines changed

3 files changed

+9
-3
lines changed

python/deltalake/table.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -980,7 +980,7 @@ def merge(
980980
error_on_type_mismatch: bool = True,
981981
writer_properties: Optional[WriterProperties] = None,
982982
large_dtypes: Optional[bool] = None,
983-
streamed_exec: bool = False,
983+
streamed_exec: bool = True,
984984
custom_metadata: Optional[Dict[str, str]] = None,
985985
post_commithook_properties: Optional[PostCommitHookProperties] = None,
986986
commit_properties: Optional[CommitProperties] = None,
@@ -998,7 +998,8 @@ def merge(
998998
error_on_type_mismatch: specify if merge will return error if data types are mismatching :default = True
999999
writer_properties: Pass writer properties to the Rust parquet writer
10001000
large_dtypes: Deprecated, will be removed in 1.0
1001-
streamed_exec: Will execute MERGE using a LazyMemoryExec plan
1001+
streamed_exec: Will execute MERGE using a LazyMemoryExec plan, this improves memory pressure for large source tables. Enabling streamed_exec
1002+
implicitly disables source table stats to derive an early_pruning_predicate
10021003
arrow_schema_conversion_mode: Large converts all types of data schema into Large Arrow types, passthrough keeps string/binary/list types untouched
10031004
custom_metadata: Deprecated and will be removed in future versions. Use commit_properties instead.
10041005
post_commithook_properties: properties for the post commit hook. If None, default values are used.

python/tests/test_generated_columns.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,9 @@ def test_merge_with_g_during_schema_evolution(
220220
expected_data = pa.Table.from_pydict(
221221
{"id": [1, 2], "gc": [5, 5]}, schema=pa.schema([id_col, gc])
222222
)
223-
assert table_with_gc.to_pyarrow_table() == expected_data
223+
assert (
224+
table_with_gc.to_pyarrow_table().sort_by([("id", "ascending")]) == expected_data
225+
)
224226

225227

226228
def test_raise_when_gc_passed_merge_statement_during_schema_evolution(

python/tests/test_merge.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1116,6 +1116,7 @@ def test_merge_timestamps_partitioned_2344(tmp_path: pathlib.Path, timezone, pre
11161116
predicate="s.datetime = t.datetime",
11171117
source_alias="s",
11181118
target_alias="t",
1119+
streamed_exec=False, # only with streamed execution off can we use stats to create a pruning predicate
11191120
).when_matched_update_all().when_not_matched_insert_all().execute()
11201121

11211122
result = dt.to_pyarrow_table()
@@ -1441,13 +1442,15 @@ def test_merge_on_decimal_3033(tmp_path):
14411442
predicate="target.timestamp = source.timestamp",
14421443
source_alias="source",
14431444
target_alias="target",
1445+
streamed_exec=False,
14441446
).when_matched_update_all().when_not_matched_insert_all().execute()
14451447

14461448
dt.merge(
14471449
source=table,
14481450
predicate="target.timestamp = source.timestamp AND target.altitude = source.altitude",
14491451
source_alias="source",
14501452
target_alias="target",
1453+
streamed_exec=False, # only with streamed execution off can we use stats to create a pruning predicate
14511454
).when_matched_update_all().when_not_matched_insert_all().execute()
14521455

14531456
string_predicate = dt.history(1)[0]["operationParameters"]["predicate"]

0 commit comments

Comments
 (0)