Skip to content

Commit 1ecfedf

Browse files
authored
Fix!: parse runtime-rendered fields, extract python env deps from merge_filter (#4905)
1 parent 7d69f1c commit 1ecfedf

File tree

3 files changed

+134
-15
lines changed

3 files changed

+134
-15
lines changed

sqlmesh/core/model/common.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,19 @@ def _executable_to_str(k: str, v: Executable) -> str:
440440
return [_executable_to_str(k, v) for k, v in sort_python_env(python_env)]
441441

442442

443+
def parse_strings_with_macro_refs(value: t.Any, dialect: DialectType) -> t.Any:
444+
if isinstance(value, str) and "@" in value:
445+
return exp.maybe_parse(value, dialect=dialect)
446+
447+
if isinstance(value, dict):
448+
for k, v in dict(value).items():
449+
value[k] = parse_strings_with_macro_refs(v, dialect)
450+
elif isinstance(value, list):
451+
value = [parse_strings_with_macro_refs(v, dialect) for v in value]
452+
453+
return value
454+
455+
443456
expression_validator: t.Callable = field_validator(
444457
"query",
445458
"expressions_",

sqlmesh/core/model/definition.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
expression_validator,
2828
make_python_env,
2929
parse_dependencies,
30+
parse_strings_with_macro_refs,
3031
single_value_or_tuple,
3132
sorted_python_env_payloads,
3233
validate_extra_and_required_fields,
@@ -72,13 +73,14 @@
7273

7374
logger = logging.getLogger(__name__)
7475

76+
77+
UNRENDERABLE_MODEL_FIELDS = {"cron", "description"}
78+
7579
PROPERTIES = {"physical_properties", "session_properties", "virtual_properties"}
7680

7781
RUNTIME_RENDERED_MODEL_FIELDS = {
7882
"audits",
7983
"signals",
80-
"description",
81-
"cron",
8284
"merge_filter",
8385
} | PROPERTIES
8486

@@ -2469,6 +2471,9 @@ def _create_model(
24692471
if isinstance(property_values, exp.Tuple):
24702472
statements.extend(property_values.expressions)
24712473

2474+
if isinstance(getattr(kwargs.get("kind"), "merge_filter", None), exp.Expression):
2475+
statements.append(kwargs["kind"].merge_filter)
2476+
24722477
jinja_macro_references, used_variables = extract_macro_references_and_variables(
24732478
*(gen(e if isinstance(e, exp.Expression) else e[0]) for e in statements)
24742479
)
@@ -2751,21 +2756,24 @@ def render_field_value(value: t.Any) -> t.Any:
27512756

27522757
for field_name, field_info in ModelMeta.all_field_infos().items():
27532758
field = field_info.alias or field_name
2759+
field_value = fields.get(field)
27542760

2755-
if field in RUNTIME_RENDERED_MODEL_FIELDS:
2761+
# We don't want to parse python model cron="@..." kwargs (e.g. @daily) into MacroVar
2762+
if field == "cron" or field_value is None:
27562763
continue
27572764

2758-
field_value = fields.get(field)
2759-
if field_value is None:
2765+
if field in RUNTIME_RENDERED_MODEL_FIELDS:
2766+
fields[field] = parse_strings_with_macro_refs(field_value, dialect)
27602767
continue
27612768

27622769
if isinstance(field_value, dict):
27632770
rendered_dict = {}
27642771
for key, value in field_value.items():
27652772
if key in RUNTIME_RENDERED_MODEL_FIELDS:
2766-
rendered_dict[key] = value
2773+
rendered_dict[key] = parse_strings_with_macro_refs(value, dialect)
27672774
elif (rendered := render_field_value(value)) is not None:
27682775
rendered_dict[key] = rendered
2776+
27692777
if rendered_dict:
27702778
fields[field] = rendered_dict
27712779
else:

tests/core/test_model.py

Lines changed: 107 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6436,7 +6436,7 @@ def end_date_macro(evaluator: MacroEvaluator, var: bool):
64366436
owner="@IF(@gateway = 'dev', @{dev_owner}, @{prod_owner})",
64376437
stamp="@{stamp}",
64386438
tags=["@{tag1}", "@{tag2}"],
6439-
description="Model desc @{test_}",
6439+
description="'Model desc @{test_}'",
64406440
)
64416441
def model_with_macros(evaluator, **kwargs):
64426442
return exp.select(
@@ -6480,26 +6480,109 @@ def model_with_macros(evaluator, **kwargs):
64806480
assert query.sql() == """SELECT 'test_value' AS "a" """.strip()
64816481

64826482

6483+
def test_unrendered_macros_sql_model(mocker: MockerFixture) -> None:
6484+
model = load_sql_based_model(
6485+
parse(
6486+
"""
6487+
MODEL (
6488+
name db.employees,
6489+
kind INCREMENTAL_BY_UNIQUE_KEY (
6490+
unique_key @{key},
6491+
merge_filter source.id > 0 and target.updated_at < @end_ds and source.updated_at > @start_ds and @merge_filter_var
6492+
),
6493+
cron '@daily',
6494+
allow_partials @IF(@gateway = 'dev', True, False),
6495+
physical_properties (
6496+
location1 = @'s3://bucket/prefix/@{schema_name}/@{table_name}',
6497+
location2 = @IF(@gateway = 'dev', @'hdfs://@{catalog_name}/@{schema_name}/dev/@{table_name}', @'s3://prod/@{table_name}'),
6498+
foo = @physical_var
6499+
),
6500+
virtual_properties (
6501+
creatable_type = @{create_type},
6502+
bar = @virtual_var,
6503+
),
6504+
session_properties (
6505+
'spark.executor.cores' = @IF(@gateway = 'dev', 1, 2),
6506+
'spark.executor.memory' = '1G',
6507+
baz = @session_var
6508+
),
6509+
);
6510+
6511+
SELECT * FROM src;
6512+
"""
6513+
),
6514+
variables={
6515+
"gateway": "dev",
6516+
"key": "a", # Not included in python_env because kind is rendered at load time
6517+
"create_type": "'SECURE'",
6518+
"merge_filter_var": True,
6519+
"physical_var": "bla",
6520+
"virtual_var": "blb",
6521+
"session_var": "blc",
6522+
},
6523+
)
6524+
6525+
assert model.python_env[c.SQLMESH_VARS] == Executable.value(
6526+
{
6527+
"gateway": "dev",
6528+
"create_type": "'SECURE'",
6529+
"merge_filter_var": True,
6530+
"physical_var": "bla",
6531+
"virtual_var": "blb",
6532+
"session_var": "blc",
6533+
}
6534+
)
6535+
6536+
assert "location1" in model.physical_properties
6537+
assert "location2" in model.physical_properties
6538+
6539+
# The properties will stay unrendered at load time
6540+
assert model.session_properties == {
6541+
"spark.executor.cores": exp.maybe_parse("@IF(@gateway = 'dev', 1, 2)"),
6542+
"spark.executor.memory": "1G",
6543+
"baz": exp.maybe_parse("@session_var"),
6544+
}
6545+
assert model.virtual_properties["creatable_type"] == exp.maybe_parse("@{create_type}")
6546+
6547+
assert (
6548+
model.physical_properties["location1"].sql()
6549+
== "@'s3://bucket/prefix/@{schema_name}/@{table_name}'"
6550+
)
6551+
assert (
6552+
model.physical_properties["location2"].sql()
6553+
== "@IF(@gateway = 'dev', @'hdfs://@{catalog_name}/@{schema_name}/dev/@{table_name}', @'s3://prod/@{table_name}')"
6554+
)
6555+
6556+
# merge_filter will stay unrendered as well
6557+
assert model.unique_key[0] == exp.column("a", quoted=True)
6558+
assert (
6559+
t.cast(exp.Expression, model.merge_filter).sql()
6560+
== '"__merge_source__"."id" > 0 AND "__merge_target__"."updated_at" < @end_ds AND "__merge_source__"."updated_at" > @start_ds AND @merge_filter_var'
6561+
)
6562+
6563+
64836564
def test_unrendered_macros_python_model(mocker: MockerFixture) -> None:
64846565
@model(
64856566
"test_unrendered_macros_python_model_@{bar}",
64866567
is_sql=True,
64876568
kind=dict(
64886569
name=ModelKindName.INCREMENTAL_BY_UNIQUE_KEY,
64896570
unique_key="@{key}",
6490-
merge_filter="source.id > 0 and target.updated_at < @end_ds and source.updated_at > @start_ds",
6571+
merge_filter="source.id > 0 and target.updated_at < @end_ds and source.updated_at > @start_ds and @merge_filter_var",
64916572
),
64926573
cron="@daily",
64936574
columns={"a": "string"},
64946575
allow_partials="@IF(@gateway = 'dev', True, False)",
64956576
physical_properties=dict(
64966577
location1="@'s3://bucket/prefix/@{schema_name}/@{table_name}'",
64976578
location2="@IF(@gateway = 'dev', @'hdfs://@{catalog_name}/@{schema_name}/dev/@{table_name}', @'s3://prod/@{table_name}')",
6579+
foo="@physical_var",
64986580
),
6499-
virtual_properties={"creatable_type": "@{create_type}"},
6581+
virtual_properties={"creatable_type": "@{create_type}", "bar": "@virtual_var"},
65006582
session_properties={
65016583
"spark.executor.cores": "@IF(@gateway = 'dev', 1, 2)",
65026584
"spark.executor.memory": "1G",
6585+
"baz": "@session_var",
65036586
},
65046587
)
65056588
def model_with_macros(evaluator, **kwargs):
@@ -6517,12 +6600,24 @@ def model_with_macros(evaluator, **kwargs):
65176600
"gateway": "dev",
65186601
"key": "a",
65196602
"create_type": "'SECURE'",
6603+
"merge_filter_var": True,
6604+
"physical_var": "bla",
6605+
"virtual_var": "blb",
6606+
"session_var": "blc",
65206607
},
65216608
)
65226609

65236610
assert python_sql_model.name == "test_unrendered_macros_python_model_suffix"
65246611
assert python_sql_model.python_env[c.SQLMESH_VARS] == Executable.value(
6525-
{"test_var_a": "test_value"}
6612+
{
6613+
"test_var_a": "test_value",
6614+
"gateway": "dev",
6615+
"create_type": "'SECURE'",
6616+
"merge_filter_var": True,
6617+
"physical_var": "bla",
6618+
"virtual_var": "blb",
6619+
"session_var": "blc",
6620+
}
65266621
)
65276622
assert python_sql_model.enabled
65286623

@@ -6536,25 +6631,28 @@ def model_with_macros(evaluator, **kwargs):
65366631

65376632
# The properties will stay unrendered at load time
65386633
assert python_sql_model.session_properties == {
6539-
"spark.executor.cores": "@IF(@gateway = 'dev', 1, 2)",
6634+
"spark.executor.cores": exp.maybe_parse("@IF(@gateway = 'dev', 1, 2)"),
65406635
"spark.executor.memory": "1G",
6636+
"baz": exp.maybe_parse("@session_var"),
65416637
}
6542-
assert python_sql_model.virtual_properties["creatable_type"] == exp.convert("@{create_type}")
6638+
assert python_sql_model.virtual_properties["creatable_type"] == exp.maybe_parse(
6639+
"@{create_type}"
6640+
)
65436641

65446642
assert (
6545-
python_sql_model.physical_properties["location1"].text("this")
6643+
python_sql_model.physical_properties["location1"].sql()
65466644
== "@'s3://bucket/prefix/@{schema_name}/@{table_name}'"
65476645
)
65486646
assert (
6549-
python_sql_model.physical_properties["location2"].text("this")
6647+
python_sql_model.physical_properties["location2"].sql()
65506648
== "@IF(@gateway = 'dev', @'hdfs://@{catalog_name}/@{schema_name}/dev/@{table_name}', @'s3://prod/@{table_name}')"
65516649
)
65526650

65536651
# merge_filter will stay unrendered as well
65546652
assert python_sql_model.unique_key[0] == exp.column("a", quoted=True)
65556653
assert (
65566654
python_sql_model.merge_filter.sql()
6557-
== '"source"."id" > 0 AND "target"."updated_at" < @end_ds AND "source"."updated_at" > @start_ds'
6655+
== '"__merge_source__"."id" > 0 AND "__merge_target__"."updated_at" < @end_ds AND "__merge_source__"."updated_at" > @start_ds AND @merge_filter_var'
65586656
)
65596657

65606658

0 commit comments

Comments
 (0)