Skip to content

Commit a089eff

Browse files
authored
refactor filter pushdown APIs (#16642)
This commit refactors the filter pushdown infrastructure to improve flexibility, readability, and maintainability: ### Major Changes: - **Eliminated `PredicateSupports`** wrapper in favor of directly using `Vec<PredicateSupport>`, simplifying APIs. - Introduced **`ChildFilterDescription::from_child`** to encapsulate logic for determining filter pushdown eligibility per child. - Added **`FilterDescription::from_children`** to generate pushdown plans based on column references across all children. - Replaced legacy methods (`all_parent_filters_supported`, etc.) with more flexible, composable APIs using builder-style chaining. - Updated all relevant nodes (`FilterExec`, `SortExec`, `RepartitionExec`, etc.) to use the new pushdown planning structure. ### Functional Adjustments: - Ensured filter column indices are reassigned properly when filters are pushed to projected inputs (e.g., in `FilterExec`). - Standardized handling of supported vs. unsupported filters throughout the propagation pipeline. - Improved handling of self-filters in nodes such as `FilterExec` and `SortExec`. ### Optimizer Improvements: - Clarified pushdown phases (`Pre`, `Post`) and respected them across execution plans. - Documented the full pushdown lifecycle within `filter_pushdown.rs`, improving discoverability for future contributors. These changes lay the groundwork for more precise and flexible filter pushdown optimizations and improve the robustness of the optimizer infrastructure.
1 parent e950df5 commit a089eff

File tree

11 files changed

+310
-380
lines changed

11 files changed

+310
-380
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ use datafusion_physical_plan::{
3434
displayable,
3535
filter::FilterExec,
3636
filter_pushdown::{
37-
ChildPushdownResult, FilterDescription, FilterPushdownPropagation,
38-
PredicateSupport, PredicateSupports,
37+
ChildFilterDescription, ChildPushdownResult, FilterDescription,
38+
FilterPushdownPropagation, PredicateSupport,
3939
},
4040
metrics::ExecutionPlanMetricsSet,
4141
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
@@ -228,11 +228,19 @@ impl FileSource for TestSource {
228228
..self.clone()
229229
});
230230
Ok(FilterPushdownPropagation {
231-
filters: PredicateSupports::all_supported(filters),
231+
filters: filters
232+
.into_iter()
233+
.map(PredicateSupport::Supported)
234+
.collect(),
232235
updated_node: Some(new_node),
233236
})
234237
} else {
235-
Ok(FilterPushdownPropagation::unsupported(filters))
238+
Ok(FilterPushdownPropagation::with_filters(
239+
filters
240+
.into_iter()
241+
.map(PredicateSupport::Unsupported)
242+
.collect(),
243+
))
236244
}
237245
}
238246

@@ -515,9 +523,12 @@ impl ExecutionPlan for TestNode {
515523
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
516524
_config: &ConfigOptions,
517525
) -> Result<FilterDescription> {
518-
Ok(FilterDescription::new_with_child_count(1)
519-
.all_parent_filters_supported(parent_filters)
520-
.with_self_filter(Arc::clone(&self.predicate)))
526+
// Since TestNode marks all parent filters as supported and adds its own filter,
527+
// we use from_child to create a description with all parent filters supported
528+
let child = &self.input;
529+
let child_desc = ChildFilterDescription::from_child(&parent_filters, child)?
530+
.with_self_filter(Arc::clone(&self.predicate));
531+
Ok(FilterDescription::new().with_child(child_desc))
521532
}
522533

523534
fn handle_child_pushdown_result(
@@ -534,7 +545,7 @@ impl ExecutionPlan for TestNode {
534545
let self_pushdown_result = child_pushdown_result.self_filters[0].clone();
535546
// And pushed down 1 filter
536547
assert_eq!(self_pushdown_result.len(), 1);
537-
let self_pushdown_result = self_pushdown_result.into_inner();
548+
let self_pushdown_result: Vec<_> = self_pushdown_result.into_iter().collect();
538549

539550
match &self_pushdown_result[0] {
540551
PredicateSupport::Unsupported(filter) => {

datafusion/datasource-parquet/src/source.rs

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,9 @@ use datafusion_datasource::file_scan_config::FileScanConfig;
4141
use datafusion_physical_expr::conjunction;
4242
use datafusion_physical_expr_common::physical_expr::fmt_sql;
4343
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
44-
use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
45-
use datafusion_physical_plan::filter_pushdown::PredicateSupports;
44+
use datafusion_physical_plan::filter_pushdown::{
45+
FilterPushdownPropagation, PredicateSupport,
46+
};
4647
use datafusion_physical_plan::metrics::Count;
4748
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
4849
use datafusion_physical_plan::DisplayFormatType;
@@ -621,7 +622,12 @@ impl FileSource for ParquetSource {
621622
config: &ConfigOptions,
622623
) -> datafusion_common::Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
623624
let Some(file_schema) = self.file_schema.clone() else {
624-
return Ok(FilterPushdownPropagation::unsupported(filters));
625+
return Ok(FilterPushdownPropagation::with_filters(
626+
filters
627+
.into_iter()
628+
.map(PredicateSupport::Unsupported)
629+
.collect(),
630+
));
625631
};
626632
// Determine if based on configs we should push filters down.
627633
// If either the table / scan itself or the config has pushdown enabled,
@@ -635,20 +641,36 @@ impl FileSource for ParquetSource {
635641
let pushdown_filters = table_pushdown_enabled || config_pushdown_enabled;
636642

637643
let mut source = self.clone();
638-
let filters = PredicateSupports::new_with_supported_check(filters, |filter| {
639-
can_expr_be_pushed_down_with_schemas(filter, &file_schema)
640-
});
641-
if filters.is_all_unsupported() {
644+
let filters: Vec<PredicateSupport> = filters
645+
.into_iter()
646+
.map(|filter| {
647+
if can_expr_be_pushed_down_with_schemas(&filter, &file_schema) {
648+
PredicateSupport::Supported(filter)
649+
} else {
650+
PredicateSupport::Unsupported(filter)
651+
}
652+
})
653+
.collect();
654+
if filters
655+
.iter()
656+
.all(|f| matches!(f, PredicateSupport::Unsupported(_)))
657+
{
642658
// No filters can be pushed down, so we can just return the remaining filters
643659
// and avoid replacing the source in the physical plan.
644660
return Ok(FilterPushdownPropagation::with_filters(filters));
645661
}
646-
let allowed_filters = filters.collect_supported();
662+
let allowed_filters = filters
663+
.iter()
664+
.filter_map(|f| match f {
665+
PredicateSupport::Supported(expr) => Some(Arc::clone(expr)),
666+
PredicateSupport::Unsupported(_) => None,
667+
})
668+
.collect_vec();
647669
let predicate = match source.predicate {
648-
Some(predicate) => conjunction(
649-
std::iter::once(predicate).chain(allowed_filters.iter().cloned()),
650-
),
651-
None => conjunction(allowed_filters.iter().cloned()),
670+
Some(predicate) => {
671+
conjunction(std::iter::once(predicate).chain(allowed_filters))
672+
}
673+
None => conjunction(allowed_filters),
652674
};
653675
source.predicate = Some(predicate);
654676
source = source.with_pushdown_filters(pushdown_filters);
@@ -657,7 +679,10 @@ impl FileSource for ParquetSource {
657679
// even if we updated the predicate to include the filters (they will only be used for stats pruning).
658680
if !pushdown_filters {
659681
return Ok(FilterPushdownPropagation::with_filters(
660-
filters.make_unsupported(),
682+
filters
683+
.into_iter()
684+
.map(|f| PredicateSupport::Unsupported(f.into_inner()))
685+
.collect_vec(),
661686
)
662687
.with_updated_node(source));
663688
}

datafusion/datasource/src/file.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ use arrow::datatypes::SchemaRef;
3030
use datafusion_common::config::ConfigOptions;
3131
use datafusion_common::{not_impl_err, Result, Statistics};
3232
use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
33-
use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
33+
use datafusion_physical_plan::filter_pushdown::{
34+
FilterPushdownPropagation, PredicateSupport,
35+
};
3436
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
3537
use datafusion_physical_plan::DisplayFormatType;
3638

@@ -120,7 +122,12 @@ pub trait FileSource: Send + Sync {
120122
filters: Vec<Arc<dyn PhysicalExpr>>,
121123
_config: &ConfigOptions,
122124
) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
123-
Ok(FilterPushdownPropagation::unsupported(filters))
125+
Ok(FilterPushdownPropagation::with_filters(
126+
filters
127+
.into_iter()
128+
.map(PredicateSupport::Unsupported)
129+
.collect(),
130+
))
124131
}
125132

126133
/// Set optional schema adapter factory.

datafusion/datasource/src/source.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext};
3838
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
3939
use datafusion_physical_expr_common::sort_expr::LexOrdering;
4040
use datafusion_physical_plan::filter_pushdown::{
41-
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation,
41+
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PredicateSupport,
4242
};
4343

4444
/// A source of data, typically a list of files or memory
@@ -168,7 +168,12 @@ pub trait DataSource: Send + Sync + Debug {
168168
filters: Vec<Arc<dyn PhysicalExpr>>,
169169
_config: &ConfigOptions,
170170
) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
171-
Ok(FilterPushdownPropagation::unsupported(filters))
171+
Ok(FilterPushdownPropagation::with_filters(
172+
filters
173+
.into_iter()
174+
.map(PredicateSupport::Unsupported)
175+
.collect(),
176+
))
172177
}
173178
}
174179

@@ -316,7 +321,14 @@ impl ExecutionPlan for DataSourceExec {
316321
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
317322
// Push any remaining filters into our data source
318323
let res = self.data_source.try_pushdown_filters(
319-
child_pushdown_result.parent_filters.collect_all(),
324+
child_pushdown_result
325+
.parent_filters
326+
.into_iter()
327+
.map(|f| match f {
328+
PredicateSupport::Supported(expr) => expr,
329+
PredicateSupport::Unsupported(expr) => expr,
330+
})
331+
.collect(),
320332
config,
321333
)?;
322334
match res.updated_node {

datafusion/physical-optimizer/src/filter_pushdown.rs

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,34 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
//! Filter Pushdown Optimization Process
19+
//!
20+
//! The filter pushdown mechanism involves four key steps:
21+
//! 1. **Optimizer Asks Parent for a Filter Pushdown Plan**: The optimizer calls [`ExecutionPlan::gather_filters_for_pushdown`]
22+
//! on the parent node, passing in parent predicates and phase. The parent node creates a [`FilterDescription`]
23+
//! by inspecting its logic and children's schemas, determining which filters can be pushed to each child.
24+
//! 2. **Optimizer Executes Pushdown**: The optimizer recursively calls `push_down_filters` in this module on each child,
25+
//! passing the appropriate filters (`Vec<Arc<dyn PhysicalExpr>>`) for that child.
26+
//! 3. **Optimizer Gathers Results**: The optimizer collects [`FilterPushdownPropagation`] results from children,
27+
//! containing information about which filters were successfully pushed down vs. unsupported.
28+
//! 4. **Parent Responds**: The optimizer calls [`ExecutionPlan::handle_child_pushdown_result`] on the parent,
29+
//! passing a [`ChildPushdownResult`] containing the aggregated pushdown outcomes. The parent decides
30+
//! how to handle filters that couldn't be pushed down (e.g., keep them as FilterExec nodes).
31+
//!
32+
//! [`FilterDescription`]: datafusion_physical_plan::filter_pushdown::FilterDescription
33+
1834
use std::sync::Arc;
1935

2036
use crate::PhysicalOptimizerRule;
2137

2238
use datafusion_common::{config::ConfigOptions, Result};
2339
use datafusion_physical_expr::PhysicalExpr;
2440
use datafusion_physical_plan::filter_pushdown::{
25-
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation,
26-
PredicateSupport, PredicateSupports,
41+
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PredicateSupport,
2742
};
2843
use datafusion_physical_plan::{with_new_children_if_necessary, ExecutionPlan};
2944

30-
use itertools::izip;
45+
use itertools::{izip, Itertools};
3146

3247
/// Attempts to recursively push given filters from the top of the tree into leafs.
3348
///
@@ -497,10 +512,10 @@ fn push_down_filters(
497512
// Our child doesn't know the difference between filters that were passed down
498513
// from our parents and filters that the current node injected. We need to de-entangle
499514
// this since we do need to distinguish between them.
500-
let mut all_filters = result.filters.into_inner();
515+
let mut all_filters = result.filters.into_iter().collect_vec();
501516
let parent_predicates = all_filters.split_off(num_self_filters);
502517
let self_predicates = all_filters;
503-
self_filters_pushdown_supports.push(PredicateSupports::new(self_predicates));
518+
self_filters_pushdown_supports.push(self_predicates);
504519

505520
for (idx, result) in parent_supported_predicate_indices
506521
.iter()
@@ -533,21 +548,15 @@ fn push_down_filters(
533548
let updated_node = with_new_children_if_necessary(Arc::clone(&node), new_children)?;
534549
// Remap the result onto the parent filters as they were given to us.
535550
// Any filters that were not pushed down to any children are marked as unsupported.
536-
let parent_pushdown_result = PredicateSupports::new(
537-
parent_predicates_pushdown_states
538-
.into_iter()
539-
.zip(parent_predicates)
540-
.map(|(state, filter)| match state {
541-
ParentPredicateStates::NoChildren => {
542-
PredicateSupport::Unsupported(filter)
543-
}
544-
ParentPredicateStates::Unsupported => {
545-
PredicateSupport::Unsupported(filter)
546-
}
547-
ParentPredicateStates::Supported => PredicateSupport::Supported(filter),
548-
})
549-
.collect(),
550-
);
551+
let parent_pushdown_result = parent_predicates_pushdown_states
552+
.into_iter()
553+
.zip(parent_predicates)
554+
.map(|(state, filter)| match state {
555+
ParentPredicateStates::NoChildren => PredicateSupport::Unsupported(filter),
556+
ParentPredicateStates::Unsupported => PredicateSupport::Unsupported(filter),
557+
ParentPredicateStates::Supported => PredicateSupport::Supported(filter),
558+
})
559+
.collect();
551560
// TODO: by calling `handle_child_pushdown_result` we are assuming that the
552561
// `ExecutionPlan` implementation will not change the plan itself.
553562
// Should we have a separate method for dynamic pushdown that does not allow modifying the plan?

datafusion/physical-plan/src/coalesce_batches.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,7 @@ impl ExecutionPlan for CoalesceBatchesExec {
234234
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
235235
_config: &ConfigOptions,
236236
) -> Result<FilterDescription> {
237-
Ok(FilterDescription::new_with_child_count(1)
238-
.all_parent_filters_supported(parent_filters))
237+
FilterDescription::from_children(parent_filters, &self.children())
239238
}
240239

241240
fn handle_child_pushdown_result(

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
1919
use crate::filter_pushdown::{
20-
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
21-
FilterPushdownPropagation,
20+
ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
21+
FilterPushdownPropagation, PredicateSupport,
2222
};
2323
pub use crate::metrics::Metric;
2424
pub use crate::ordering::InputOrderMode;
@@ -33,6 +33,7 @@ pub use datafusion_physical_expr::window::WindowExpr;
3333
pub use datafusion_physical_expr::{
3434
expressions, Distribution, Partitioning, PhysicalExpr,
3535
};
36+
use itertools::Itertools;
3637

3738
use std::any::Any;
3839
use std::fmt::Debug;
@@ -520,10 +521,19 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
520521
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
521522
_config: &ConfigOptions,
522523
) -> Result<FilterDescription> {
523-
Ok(
524-
FilterDescription::new_with_child_count(self.children().len())
525-
.all_parent_filters_unsupported(parent_filters),
526-
)
524+
// Default implementation: mark all filters as unsupported for all children
525+
let mut desc = FilterDescription::new();
526+
let child_filters = parent_filters
527+
.iter()
528+
.map(|f| PredicateSupport::Unsupported(Arc::clone(f)))
529+
.collect_vec();
530+
for _ in 0..self.children().len() {
531+
desc = desc.with_child(ChildFilterDescription {
532+
parent_filters: child_filters.clone(),
533+
self_filters: vec![],
534+
});
535+
}
536+
Ok(desc)
527537
}
528538

529539
/// Handle the result of a child pushdown.
@@ -587,16 +597,15 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
587597
///
588598
/// **Helper Methods for Customization:**
589599
/// There are various helper methods to simplify implementing this method:
590-
/// - [`FilterPushdownPropagation::unsupported`]: Indicates that the node
591-
/// does not support filter pushdown at all, rejecting all filters.
592600
/// - [`FilterPushdownPropagation::transparent`]: Indicates that the node
593601
/// supports filter pushdown but does not modify it, simply transmitting
594602
/// the children's pushdown results back up to its parent.
595-
/// - [`PredicateSupports::new_with_supported_check`]: Takes a callback to
596-
/// dynamically determine support for each filter, useful with
597-
/// [`FilterPushdownPropagation::with_filters`] and
598-
/// [`FilterPushdownPropagation::with_updated_node`] to build mixed results
599-
/// of supported and unsupported filters.
603+
/// - [`FilterPushdownPropagation::with_filters`]: Allows adding filters
604+
/// to the propagation result, indicating which filters are supported by
605+
/// the current node.
606+
/// - [`FilterPushdownPropagation::with_updated_node`]: Allows updating the
607+
/// current node in the propagation result, used if the node
608+
/// has modified its plan based on the pushdown results.
600609
///
601610
/// **Filter Pushdown Phases:**
602611
/// There are two different phases in filter pushdown (`Pre` and others),
@@ -605,7 +614,6 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
605614
/// [`FilterPushdownPhase`] for more details on phase-specific behavior.
606615
///
607616
/// [`PredicateSupport::Supported`]: crate::filter_pushdown::PredicateSupport::Supported
608-
/// [`PredicateSupports::new_with_supported_check`]: crate::filter_pushdown::PredicateSupports::new_with_supported_check
609617
fn handle_child_pushdown_result(
610618
&self,
611619
_phase: FilterPushdownPhase,

0 commit comments

Comments
 (0)