Skip to content

Commit 0adfa60

Browse files
committed
fix(cubesql): Split __user WHERE predicate into separate filter node
1 parent 362c32c commit 0adfa60

File tree

8 files changed

+356
-16
lines changed

8 files changed

+356
-16
lines changed

rust/cubesql/cubesql/src/compile/engine/df/optimizers/filter_push_down.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -984,12 +984,12 @@ mod tests {
984984
#[test]
985985
fn test_filter_down_cross_join_right_one_row() -> Result<()> {
986986
let plan = LogicalPlanBuilder::from(
987-
LogicalPlanBuilder::from(make_sample_table("j1", vec!["c1"])?)
987+
LogicalPlanBuilder::from(make_sample_table("j1", vec!["c1"], vec![])?)
988988
.project(vec![col("c1")])?
989989
.build()?,
990990
)
991991
.cross_join(
992-
&LogicalPlanBuilder::from(make_sample_table("j2", vec!["c2"])?)
992+
&LogicalPlanBuilder::from(make_sample_table("j2", vec!["c2"], vec![])?)
993993
.project(vec![col("c2")])?
994994
.aggregate(vec![] as Vec<Expr>, vec![count(lit(1u8))])?
995995
.project_with_alias(
Lines changed: 327 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,327 @@
1+
use std::sync::Arc;
2+
3+
use datafusion::{
4+
error::{DataFusionError, Result},
5+
logical_plan::{
6+
plan::{
7+
Aggregate, CrossJoin, Distinct, Join, Limit, Projection, Repartition, Sort, Subquery,
8+
Union, Window,
9+
},
10+
Column, Expr, Filter, LogicalPlan, Operator,
11+
},
12+
optimizer::optimizer::{OptimizerConfig, OptimizerRule},
13+
physical_plan::functions::BuiltinScalarFunction,
14+
};
15+
16+
/// Filter Split Meta optimizer rule splits a `WHERE` clause into two distinct filters,
17+
/// pushing meta filters (currently only `__user`) down the plan, separate from other filters.
18+
/// This helps with SQL push down, as otherwise `CubeScan` would not contain `ChangeUserMember`
19+
/// since filters would contain replacers.
20+
#[derive(Default)]
21+
pub struct FilterSplitMeta {}
22+
23+
impl FilterSplitMeta {
24+
#[allow(missing_docs)]
25+
pub fn new() -> Self {
26+
Self {}
27+
}
28+
}
29+
30+
impl OptimizerRule for FilterSplitMeta {
31+
fn optimize(
32+
&self,
33+
plan: &LogicalPlan,
34+
optimizer_config: &OptimizerConfig,
35+
) -> Result<LogicalPlan> {
36+
filter_split_meta(self, plan, optimizer_config)
37+
}
38+
39+
fn name(&self) -> &str {
40+
"__cube__filter_split_meta"
41+
}
42+
}
43+
44+
/// Recursively optimizes plan, searching for filters that can be split.
45+
/// Continues optimizing down the plan after splitting.
46+
fn filter_split_meta(
47+
optimizer: &FilterSplitMeta,
48+
plan: &LogicalPlan,
49+
optimizer_config: &OptimizerConfig,
50+
) -> Result<LogicalPlan> {
51+
match plan {
52+
LogicalPlan::Projection(Projection {
53+
expr,
54+
input,
55+
schema,
56+
alias,
57+
}) => Ok(LogicalPlan::Projection(Projection {
58+
expr: expr.clone(),
59+
input: Arc::new(filter_split_meta(optimizer, input, optimizer_config)?),
60+
schema: schema.clone(),
61+
alias: alias.clone(),
62+
})),
63+
LogicalPlan::Filter(Filter { predicate, input }) => {
64+
// Filter expressions can be moved around or split when they're chained with `AND` safely.
65+
// However, the input of `Filter` might be realiased, so we can't be sure if `__user` is really
66+
// for the original meta column; it makes sense to apply this only if input is `TableScan`.
67+
// However, we calso have joins complicating things.
68+
// Additionally, there's no harm in splitting `__user` filter from other filters anyway;
69+
// hence we'll split all `Filter` nodes.
70+
let (normal_predicates, meta_predicates) = split_predicates(predicate, vec![], vec![]);
71+
let mut plan = filter_split_meta(optimizer, input, optimizer_config)?;
72+
if !meta_predicates.is_empty() {
73+
plan = LogicalPlan::Filter(Filter {
74+
predicate: collect_predicates(meta_predicates)?,
75+
input: Arc::new(plan),
76+
});
77+
}
78+
if !normal_predicates.is_empty() {
79+
plan = LogicalPlan::Filter(Filter {
80+
predicate: collect_predicates(normal_predicates)?,
81+
input: Arc::new(plan),
82+
});
83+
}
84+
Ok(plan)
85+
}
86+
LogicalPlan::Window(Window {
87+
input,
88+
window_expr,
89+
schema,
90+
}) => Ok(LogicalPlan::Window(Window {
91+
input: Arc::new(filter_split_meta(optimizer, input, optimizer_config)?),
92+
window_expr: window_expr.clone(),
93+
schema: schema.clone(),
94+
})),
95+
LogicalPlan::Aggregate(Aggregate {
96+
input,
97+
group_expr,
98+
aggr_expr,
99+
schema,
100+
}) => Ok(LogicalPlan::Aggregate(Aggregate {
101+
input: Arc::new(filter_split_meta(optimizer, input, optimizer_config)?),
102+
group_expr: group_expr.clone(),
103+
aggr_expr: aggr_expr.clone(),
104+
schema: schema.clone(),
105+
})),
106+
LogicalPlan::Sort(Sort { expr, input }) => Ok(LogicalPlan::Sort(Sort {
107+
expr: expr.clone(),
108+
input: Arc::new(filter_split_meta(optimizer, input, optimizer_config)?),
109+
})),
110+
LogicalPlan::Join(Join {
111+
left,
112+
right,
113+
on,
114+
join_type,
115+
join_constraint,
116+
schema,
117+
null_equals_null,
118+
}) => Ok(LogicalPlan::Join(Join {
119+
left: Arc::new(filter_split_meta(optimizer, left, optimizer_config)?),
120+
right: Arc::new(filter_split_meta(optimizer, right, optimizer_config)?),
121+
on: on.clone(),
122+
join_type: join_type.clone(),
123+
join_constraint: join_constraint.clone(),
124+
schema: schema.clone(),
125+
null_equals_null: null_equals_null.clone(),
126+
})),
127+
LogicalPlan::CrossJoin(CrossJoin {
128+
left,
129+
right,
130+
schema,
131+
}) => Ok(LogicalPlan::CrossJoin(CrossJoin {
132+
left: Arc::new(filter_split_meta(optimizer, left, optimizer_config)?),
133+
right: Arc::new(filter_split_meta(optimizer, right, optimizer_config)?),
134+
schema: schema.clone(),
135+
})),
136+
LogicalPlan::Repartition(Repartition {
137+
input,
138+
partitioning_scheme,
139+
}) => Ok(LogicalPlan::Repartition(Repartition {
140+
input: Arc::new(filter_split_meta(optimizer, input, optimizer_config)?),
141+
partitioning_scheme: partitioning_scheme.clone(),
142+
})),
143+
LogicalPlan::Union(Union {
144+
inputs,
145+
schema,
146+
alias,
147+
}) => Ok(LogicalPlan::Union(Union {
148+
inputs: inputs
149+
.iter()
150+
.map(|plan| filter_split_meta(optimizer, plan, optimizer_config))
151+
.collect::<Result<_>>()?,
152+
schema: schema.clone(),
153+
alias: alias.clone(),
154+
})),
155+
plan @ LogicalPlan::TableScan(_) | plan @ LogicalPlan::EmptyRelation(_) => {
156+
// `TableScan` and `EmptyRelation` are as far as we can optimize.
157+
Ok(plan.clone())
158+
}
159+
LogicalPlan::Limit(Limit { skip, fetch, input }) => Ok(LogicalPlan::Limit(Limit {
160+
skip: skip.clone(),
161+
fetch: fetch.clone(),
162+
input: Arc::new(filter_split_meta(optimizer, input, optimizer_config)?),
163+
})),
164+
LogicalPlan::Subquery(Subquery {
165+
subqueries,
166+
input,
167+
schema,
168+
types,
169+
}) => Ok(LogicalPlan::Subquery(Subquery {
170+
subqueries: subqueries
171+
.iter()
172+
.map(|subquery| filter_split_meta(optimizer, subquery, optimizer_config))
173+
.collect::<Result<_>>()?,
174+
input: Arc::new(filter_split_meta(optimizer, input, optimizer_config)?),
175+
schema: schema.clone(),
176+
types: types.clone(),
177+
})),
178+
LogicalPlan::Distinct(Distinct { input }) => Ok(LogicalPlan::Distinct(Distinct {
179+
input: Arc::new(filter_split_meta(optimizer, input, optimizer_config)?),
180+
})),
181+
other => {
182+
// The rest of the plans have no inputs to optimize, or it makes no sense
183+
// to optimize them.
184+
Ok(other.clone())
185+
}
186+
}
187+
}
188+
189+
/// Splits the provided predicate into two vectors: one for normal predicates and one for meta predicates.
190+
/// These will later be concatenated into a single `Filter` node each.
191+
fn split_predicates(
192+
predicate: &Expr,
193+
mut normal_predicates: Vec<Expr>,
194+
mut meta_predicates: Vec<Expr>,
195+
) -> (Vec<Expr>, Vec<Expr>) {
196+
if let Expr::BinaryExpr { left, op, right } = predicate {
197+
if *op == Operator::And {
198+
let (normal_predicates, meta_predicates) =
199+
split_predicates(left, normal_predicates, meta_predicates);
200+
let (normal_predicates, meta_predicates) =
201+
split_predicates(right, normal_predicates, meta_predicates);
202+
return (normal_predicates, meta_predicates);
203+
}
204+
}
205+
206+
if is_meta_predicate(predicate) {
207+
meta_predicates.push(predicate.clone());
208+
} else {
209+
normal_predicates.push(predicate.clone());
210+
}
211+
(normal_predicates, meta_predicates)
212+
}
213+
214+
/// Determines if the provided expression is a meta predicate.
215+
/// Supported variants:
216+
/// - `BinaryExpr` with `Eq`, `Like`, or `ILike` operators and one of the sides being a meta column;
217+
/// - `Like` or `ILike` with expr or pattern being a meta column;
218+
/// - `IsNotNull` over a meta column (or `Not` over `IsNull` over a meta column);
219+
/// - `InList` with one value in list and expr or list value being a meta column.
220+
fn is_meta_predicate(predicate: &Expr) -> bool {
221+
match predicate {
222+
Expr::BinaryExpr { left, op, right } => {
223+
if matches!(op, Operator::Eq | Operator::Like | Operator::ILike) {
224+
return is_meta_column(left) || is_meta_column(right);
225+
}
226+
false
227+
}
228+
Expr::Like(like) | Expr::ILike(like) => {
229+
is_meta_column(&like.expr) || is_meta_column(&like.pattern)
230+
}
231+
Expr::IsNotNull(expr) => is_meta_column(expr),
232+
Expr::Not(expr) => match expr.as_ref() {
233+
Expr::IsNull(expr) => is_meta_column(expr),
234+
_ => false,
235+
},
236+
Expr::InList {
237+
expr,
238+
list,
239+
negated: false,
240+
} => {
241+
if list.len() != 1 {
242+
return false;
243+
}
244+
is_meta_column(expr) || is_meta_column(&list[0])
245+
}
246+
_ => false,
247+
}
248+
}
249+
250+
/// Determines if the provided expression is meta column reference.
251+
/// Currently, only `__user` is considered a meta column.
252+
/// Additionally, `Lower` function over a meta column is also considered a meta column.
253+
fn is_meta_column(expr: &Expr) -> bool {
254+
match expr {
255+
Expr::Column(Column { name, .. }) => name.eq_ignore_ascii_case("__user"),
256+
Expr::ScalarFunction { fun, args } => {
257+
if matches!(fun, BuiltinScalarFunction::Lower) && args.len() == 1 {
258+
return is_meta_column(&args[0]);
259+
}
260+
false
261+
}
262+
_ => false,
263+
}
264+
}
265+
266+
/// Concatenates the provided predicates into a single expression using `AND` operator.
267+
fn collect_predicates(predicates: Vec<Expr>) -> Result<Expr> {
268+
predicates
269+
.into_iter()
270+
.rev()
271+
.reduce(|last, next| Expr::BinaryExpr {
272+
left: Box::new(next),
273+
op: Operator::And,
274+
right: Box::new(last),
275+
})
276+
.ok_or_else(|| {
277+
DataFusionError::Internal(
278+
"Unable to optimize plan: can't concatenate predicates, vec is unexpectedly empty"
279+
.to_string(),
280+
)
281+
})
282+
}
283+
284+
#[cfg(test)]
285+
mod tests {
286+
use super::{super::utils::make_sample_table, *};
287+
use datafusion::logical_plan::{col, lit, LogicalPlanBuilder};
288+
289+
fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
290+
let rule = FilterSplitMeta::new();
291+
rule.optimize(plan, &OptimizerConfig::new())
292+
}
293+
294+
fn assert_optimized_plan_eq(plan: LogicalPlan, expected: &str) {
295+
let optimized_plan = optimize(&plan).expect("failed to optimize plan");
296+
let formatted_plan = format!("{:?}", optimized_plan);
297+
assert_eq!(formatted_plan, expected);
298+
}
299+
300+
#[test]
301+
fn test_filter_split_meta() -> Result<()> {
302+
let plan = LogicalPlanBuilder::from(make_sample_table(
303+
"t1",
304+
vec!["c1", "c2", "c3"],
305+
vec!["__user"],
306+
)?)
307+
.filter(
308+
col("c1")
309+
.gt(lit(10i32))
310+
.and(col("__user").eq(lit("postgres".to_string())))
311+
.and(col("c2").lt(lit(5i32)))
312+
.and(col("__user").is_not_null()),
313+
)?
314+
.project(vec![col("c1"), col("c2"), col("c3")])?
315+
.build()?;
316+
317+
let expected = "\
318+
Projection: #t1.c1, #t1.c2, #t1.c3\
319+
\n Filter: #t1.c1 > Int32(10) AND #t1.c2 < Int32(5)\
320+
\n Filter: #t1.__user = Utf8(\"postgres\") AND #t1.__user IS NOT NULL\
321+
\n TableScan: t1 projection=None\
322+
";
323+
324+
assert_optimized_plan_eq(plan, expected);
325+
Ok(())
326+
}
327+
}

rust/cubesql/cubesql/src/compile/engine/df/optimizers/limit_push_down.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -449,12 +449,12 @@ mod tests {
449449
#[test]
450450
fn test_limit_down_cross_join_right_one_row() -> Result<()> {
451451
let plan = LogicalPlanBuilder::from(
452-
LogicalPlanBuilder::from(make_sample_table("j1", vec!["c1"])?)
452+
LogicalPlanBuilder::from(make_sample_table("j1", vec!["c1"], vec![])?)
453453
.project(vec![col("c1")])?
454454
.build()?,
455455
)
456456
.cross_join(
457-
&LogicalPlanBuilder::from(make_sample_table("j2", vec!["c2"])?)
457+
&LogicalPlanBuilder::from(make_sample_table("j2", vec!["c2"], vec![])?)
458458
.project(vec![col("c2")])?
459459
.aggregate(vec![] as Vec<Expr>, vec![count(lit(1u8))])?
460460
.project_with_alias(
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
pub mod utils;
22

33
mod filter_push_down;
4+
mod filter_split_meta;
45
mod limit_push_down;
56
mod sort_push_down;
67

78
pub use filter_push_down::FilterPushDown;
9+
pub use filter_split_meta::FilterSplitMeta;
810
pub use limit_push_down::LimitPushDown;
911
pub use sort_push_down::SortPushDown;

0 commit comments

Comments
 (0)