Skip to content

Commit 4d24e91

Browse files
committed
physical expr: support placeholders
This patchs adds a support for placeholders on the physical expression level. It allows to use generic plans and then resolve placeholders on the execution stage. Placeholders are resolved on the `execution(...)` phase. Each `ExecutionPlan` is responsible for resolving placeholders in it's own expressions. Example: ```sh > create table a(x int); > explain select x + $1 from a where x > $2; +---------------+-------------------------------------------------------------------------+ | plan_type | plan | +---------------+-------------------------------------------------------------------------+ | logical_plan | Projection: a.x + $1 | | | Filter: a.x > $2 | | | TableScan: a projection=[x] | | physical_plan | ProjectionExec: expr=[x@0 + $1 as a.x + $1] | | | RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1 | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: x@0 > $2 | | | MemoryExec: partitions=1, partition_sizes=[0] | | | | +---------------+-------------------------------------------------------------------------+ ```
1 parent 7cedeaf commit 4d24e91

39 files changed

+1076
-93
lines changed

datafusion-examples/examples/advanced_udwf.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,13 @@ impl WindowUDFImpl for SmoothItUdf {
8080
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
8181
Ok(Box::new(MyPartitionEvaluator::new()))
8282
}
83+
84+
fn resolve_placeholders(
85+
&self,
86+
_param_values: &Option<datafusion_common::ParamValues>,
87+
) -> Result<Option<std::sync::Arc<dyn WindowUDFImpl>>> {
88+
Ok(None)
89+
}
8390
}
8491

8592
/// This implements the lowest level evaluation for a window function

datafusion-examples/examples/simplify_udwf_expression.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,13 @@ impl WindowUDFImpl for SimplifySmoothItUdf {
8484

8585
Some(Box::new(simplify))
8686
}
87+
88+
fn resolve_placeholders(
89+
&self,
90+
_param_values: &Option<datafusion_common::ParamValues>,
91+
) -> Result<Option<std::sync::Arc<dyn WindowUDFImpl>>> {
92+
Ok(None)
93+
}
8794
}
8895

8996
// create local execution context with `cars.csv` registered as a table named `cars`

datafusion/core/tests/user_defined/user_defined_window_functions.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,13 @@ impl OddCounter {
565565
fn aliases(&self) -> &[String] {
566566
&self.aliases
567567
}
568+
569+
fn resolve_placeholders(
570+
&self,
571+
_param_values: &Option<datafusion_common::ParamValues>,
572+
) -> Result<Option<Arc<dyn WindowUDFImpl>>> {
573+
Ok(None)
574+
}
568575
}
569576

570577
ctx.register_udwf(WindowUDF::from(SimpleWindowUDF::new(test_state)))

datafusion/execution/src/task.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::{
2626
registry::FunctionRegistry,
2727
runtime_env::{RuntimeEnv, RuntimeEnvBuilder},
2828
};
29-
use datafusion_common::{plan_datafusion_err, DataFusionError, Result};
29+
use datafusion_common::{plan_datafusion_err, DataFusionError, ParamValues, Result};
3030
use datafusion_expr::planner::ExprPlanner;
3131
use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
3232

@@ -53,6 +53,8 @@ pub struct TaskContext {
5353
window_functions: HashMap<String, Arc<WindowUDF>>,
5454
/// Runtime environment associated with this task context
5555
runtime: Arc<RuntimeEnv>,
56+
/// Param values for physical placeholders.
57+
param_values: Option<ParamValues>,
5658
}
5759

5860
impl Default for TaskContext {
@@ -70,6 +72,7 @@ impl Default for TaskContext {
7072
aggregate_functions: HashMap::new(),
7173
window_functions: HashMap::new(),
7274
runtime,
75+
param_values: None,
7376
}
7477
}
7578
}
@@ -97,6 +100,7 @@ impl TaskContext {
97100
aggregate_functions,
98101
window_functions,
99102
runtime,
103+
param_values: None,
100104
}
101105
}
102106

@@ -125,6 +129,17 @@ impl TaskContext {
125129
Arc::clone(&self.runtime)
126130
}
127131

132+
/// Return param values associated with thix [`TaskContext`].
133+
pub fn param_values(&self) -> &Option<ParamValues> {
134+
&self.param_values
135+
}
136+
137+
/// Update the param values.
138+
pub fn with_param_values(mut self, param_values: ParamValues) -> Self {
139+
self.param_values = Some(param_values);
140+
self
141+
}
142+
128143
/// Update the [`SessionConfig`]
129144
pub fn with_session_config(mut self, session_config: SessionConfig) -> Self {
130145
self.session_config = session_config;

datafusion/expr/src/expr_fn.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,13 @@ impl WindowUDFImpl for SimpleWindowUDF {
665665
fn partition_evaluator(&self) -> Result<Box<dyn crate::PartitionEvaluator>> {
666666
(self.partition_evaluator_factory)()
667667
}
668+
669+
fn resolve_placeholders(
670+
&self,
671+
_param_values: &Option<datafusion_common::ParamValues>,
672+
) -> Result<Option<Arc<dyn WindowUDFImpl>>> {
673+
Ok(None)
674+
}
668675
}
669676

670677
pub fn interval_year_month_lit(value: &str) -> Expr {

datafusion/expr/src/udwf.rs

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use std::{
2727
sync::Arc,
2828
};
2929

30-
use datafusion_common::{not_impl_err, Result};
30+
use datafusion_common::{not_impl_err, ParamValues, Result};
3131

3232
use crate::expr::WindowFunction;
3333
use crate::{
@@ -107,9 +107,12 @@ impl WindowUDF {
107107
where
108108
F: WindowUDFImpl + 'static,
109109
{
110-
Self {
111-
inner: Arc::new(fun),
112-
}
110+
Self::new_from_arc_impl(Arc::new(fun))
111+
}
112+
113+
/// Createa a new `WindowUDF` from a dyn `[WindowUDFImpl`].
114+
pub fn new_from_arc_impl(fun: Arc<dyn WindowUDFImpl>) -> Self {
115+
Self { inner: fun }
113116
}
114117

115118
/// Return the underlying [`WindowUDFImpl`] trait object for this function
@@ -384,6 +387,15 @@ pub trait WindowUDFImpl: Debug + Send + Sync {
384387
fn coerce_types(&self, _arg_types: &[DataType]) -> Result<Vec<DataType>> {
385388
not_impl_err!("Function {} does not implement coerce_types", self.name())
386389
}
390+
391+
/// Resolve placeholders in this expession.
392+
/// Returns [`Some`] with rewrited expression if there is
393+
/// at least one placeholder.
394+
/// Otherwise returns [`None`].
395+
fn resolve_placeholders(
396+
&self,
397+
param_values: &Option<ParamValues>,
398+
) -> Result<Option<Arc<dyn WindowUDFImpl>>>;
387399
}
388400

389401
impl PartialEq for dyn WindowUDFImpl {
@@ -476,6 +488,21 @@ impl WindowUDFImpl for AliasedWindowUDFImpl {
476488
fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
477489
self.inner.coerce_types(arg_types)
478490
}
491+
492+
fn resolve_placeholders(
493+
&self,
494+
param_values: &Option<ParamValues>,
495+
) -> Result<Option<Arc<dyn WindowUDFImpl>>> {
496+
let inner = self.inner.resolve_placeholders(param_values)?;
497+
Ok(if let Some(inner) = inner {
498+
Some(Arc::new(Self {
499+
inner: inner,
500+
aliases: self.aliases.clone(),
501+
}))
502+
} else {
503+
None
504+
})
505+
}
479506
}
480507

481508
/// Implementation of [`WindowUDFImpl`] that wraps the function style pointers
@@ -525,6 +552,13 @@ impl WindowUDFImpl for WindowUDFLegacyWrapper {
525552
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
526553
(self.partition_evaluator_factory)()
527554
}
555+
556+
fn resolve_placeholders(
557+
&self,
558+
_param_values: &Option<ParamValues>,
559+
) -> Result<Option<Arc<dyn WindowUDFImpl>>> {
560+
Ok(None)
561+
}
528562
}
529563

530564
#[cfg(test)]
@@ -570,6 +604,12 @@ mod test {
570604
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
571605
unimplemented!()
572606
}
607+
fn resolve_placeholders(
608+
&self,
609+
_param_values: &Option<datafusion_common::ParamValues>,
610+
) -> Result<Option<std::sync::Arc<dyn WindowUDFImpl>>> {
611+
Ok(None)
612+
}
573613
}
574614

575615
#[derive(Debug, Clone)]
@@ -606,6 +646,12 @@ mod test {
606646
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
607647
unimplemented!()
608648
}
649+
fn resolve_placeholders(
650+
&self,
651+
_param_values: &Option<datafusion_common::ParamValues>,
652+
) -> Result<Option<std::sync::Arc<dyn WindowUDFImpl>>> {
653+
Ok(None)
654+
}
609655
}
610656

611657
#[test]

datafusion/functions-window/src/row_number.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,13 @@ impl WindowUDFImpl for RowNumber {
102102
nulls_first: false,
103103
})
104104
}
105+
106+
fn resolve_placeholders(
107+
&self,
108+
_param_values: &Option<datafusion_common::ParamValues>,
109+
) -> Result<Option<std::sync::Arc<dyn WindowUDFImpl>>> {
110+
Ok(None)
111+
}
105112
}
106113

107114
/// State for the `row_number` built-in window function.

datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3916,5 +3916,12 @@ mod tests {
39163916
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
39173917
unimplemented!("not needed for tests")
39183918
}
3919+
3920+
fn resolve_placeholders(
3921+
&self,
3922+
_param_values: &Option<datafusion_common::ParamValues>,
3923+
) -> Result<Option<Arc<dyn WindowUDFImpl>>> {
3924+
Ok(None)
3925+
}
39193926
}
39203927
}

datafusion/physical-expr/src/aggregate.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ pub mod utils {
3535
}
3636

3737
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
38-
use datafusion_common::ScalarValue;
3938
use datafusion_common::{internal_err, not_impl_err, Result};
39+
use datafusion_common::{ParamValues, ScalarValue};
4040
use datafusion_expr::AggregateUDF;
4141
use datafusion_expr::ReversedUDAF;
4242
use datafusion_expr_common::accumulator::Accumulator;
@@ -52,6 +52,8 @@ use datafusion_expr_common::groups_accumulator::GroupsAccumulator;
5252
use std::fmt::Debug;
5353
use std::sync::Arc;
5454

55+
use crate::expressions::resolve_placeholders_seq;
56+
5557
/// Builder for physical [`AggregateFunctionExpr`]
5658
///
5759
/// `AggregateFunctionExpr` contains the information necessary to call
@@ -528,6 +530,38 @@ impl AggregateFunctionExpr {
528530
pub fn default_value(&self, data_type: &DataType) -> Result<ScalarValue> {
529531
self.fun.default_value(data_type)
530532
}
533+
534+
/// Resolves placeholders.
535+
/// If there are no placeholders returns [`None`].
536+
/// Otherwise returns [`Some`] that contains resolved expression.
537+
pub fn resolve_placeholders(
538+
self: &Arc<Self>,
539+
param_values: &Option<ParamValues>,
540+
) -> Result<Option<Arc<Self>>> {
541+
Ok(
542+
if let Some(resolved_args) =
543+
resolve_placeholders_seq(&self.args, param_values)?
544+
{
545+
Some(Arc::new(AggregateFunctionExpr {
546+
fun: self.fun.clone(),
547+
args: resolved_args,
548+
data_type: self.data_type.clone(),
549+
name: self.name.clone(),
550+
schema: self.schema.clone(),
551+
ordering_req: self.ordering_req.clone(),
552+
ignore_nulls: self.ignore_nulls,
553+
ordering_fields: self.ordering_fields.clone(),
554+
is_distinct: self.is_distinct,
555+
is_reversed: self.is_reversed,
556+
input_types: self.input_types.clone(),
557+
is_nullable: self.is_nullable,
558+
}))
559+
} else {
560+
// Args do not contain placeholders at all.
561+
None
562+
},
563+
)
564+
}
531565
}
532566

533567
/// Stores the physical expressions used inside the `AggregateExpr`.

datafusion/physical-expr/src/expressions/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ mod literal;
3030
mod negative;
3131
mod no_op;
3232
mod not;
33+
mod placeholder;
3334
mod try_cast;
3435
mod unknown_column;
3536

@@ -55,5 +56,6 @@ pub use literal::{lit, Literal};
5556
pub use negative::{negative, NegativeExpr};
5657
pub use no_op::NoOp;
5758
pub use not::{not, NotExpr};
59+
pub use placeholder::{placeholder, resolve_placeholders, resolve_placeholders_seq};
5860
pub use try_cast::{try_cast, TryCastExpr};
5961
pub use unknown_column::UnKnownColumn;

0 commit comments

Comments
 (0)