Skip to content

Support physical plan reusage #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 87 additions & 10 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ jobs:
# Check crate compiles
linux-build-lib:
name: cargo check
if: (github.event_name == 'push') ||
(github.event_name == 'pull_request' &&
github.event.pull_request.head.repo.full_name != github.repository) ||
(github.event_name == 'workflow_dispatch')
runs-on: ubuntu-latest
container:
image: amd64/rust
Expand Down Expand Up @@ -133,6 +137,10 @@ jobs:
# Run tests
linux-test:
name: cargo test (amd64)
if: (github.event_name == 'push') ||
(github.event_name == 'pull_request' &&
github.event.pull_request.head.repo.full_name != github.repository) ||
(github.event_name == 'workflow_dispatch')
needs: [ linux-build-lib ]
runs-on: ubuntu-latest
container:
Expand All @@ -144,14 +152,18 @@ jobs:
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
rust-version: stable
rust-version: stable
- name: Run tests (excluding doctests)
run: cargo test --lib --tests --bins --features avro,json,backtrace
- name: Verify Working Directory Clean
run: git diff --exit-code

linux-test-datafusion-cli:
name: cargo test datafusion-cli (amd64)
if: (github.event_name == 'push') ||
(github.event_name == 'pull_request' &&
github.event.pull_request.head.repo.full_name != github.repository) ||
(github.event_name == 'workflow_dispatch')
needs: [ linux-build-lib ]
runs-on: ubuntu-latest
container:
Expand All @@ -173,6 +185,10 @@ jobs:

linux-test-example:
name: cargo examples (amd64)
if: (github.event_name == 'push') ||
(github.event_name == 'pull_request' &&
github.event.pull_request.head.repo.full_name != github.repository) ||
(github.event_name == 'workflow_dispatch')
needs: [ linux-build-lib ]
runs-on: ubuntu-latest
container:
Expand All @@ -199,6 +215,10 @@ jobs:
# Run `cargo test doc` (test documentation examples)
linux-test-doc:
name: cargo test doc (amd64)
if: (github.event_name == 'push') ||
(github.event_name == 'pull_request' &&
github.event.pull_request.head.repo.full_name != github.repository) ||
(github.event_name == 'workflow_dispatch')
needs: [ linux-build-lib ]
runs-on: ubuntu-latest
container:
Expand All @@ -222,6 +242,10 @@ jobs:
# Run `cargo doc` to ensure the rustdoc is clean
linux-rustdoc:
name: cargo doc
if: (github.event_name == 'push') ||
(github.event_name == 'pull_request' &&
github.event.pull_request.head.repo.full_name != github.repository) ||
(github.event_name == 'workflow_dispatch')
needs: [ linux-build-lib ]
runs-on: ubuntu-latest
container:
Expand All @@ -237,6 +261,7 @@ jobs:

linux-wasm-pack:
name: build with wasm-pack
if: false
runs-on: ubuntu-latest
container:
image: amd64/rust
Expand All @@ -255,6 +280,10 @@ jobs:
# verify that the benchmark queries return the correct results
verify-benchmark-results:
name: verify benchmark results (amd64)
if: (github.event_name == 'push') ||
(github.event_name == 'pull_request' &&
github.event.pull_request.head.repo.full_name != github.repository) ||
(github.event_name == 'workflow_dispatch')
needs: [ linux-build-lib ]
runs-on: ubuntu-latest
container:
Expand Down Expand Up @@ -286,6 +315,10 @@ jobs:

sqllogictest-postgres:
name: "Run sqllogictest with Postgres runner"
if: (github.event_name == 'push') ||
(github.event_name == 'pull_request' &&
github.event.pull_request.head.repo.full_name != github.repository) ||
(github.event_name == 'workflow_dispatch')
needs: [ linux-build-lib ]
runs-on: ubuntu-latest
services:
Expand Down Expand Up @@ -317,6 +350,10 @@ jobs:

windows:
name: cargo test (win64)
if: (github.event_name == 'push') ||
(github.event_name == 'pull_request' &&
github.event.pull_request.head.repo.full_name != github.repository) ||
(github.event_name == 'workflow_dispatch')
runs-on: windows-latest
steps:
- uses: actions/checkout@v4
Expand All @@ -334,22 +371,30 @@ jobs:

macos:
name: cargo test (macos)
if: (github.event_name == 'push') ||
(github.event_name == 'pull_request' &&
github.event.pull_request.head.repo.full_name != github.repository) ||
(github.event_name == 'workflow_dispatch')
runs-on: macos-latest
steps:
- uses: actions/checkout@v4
with:
submodules: true
submodules: true
- name: Setup Rust toolchain
uses: ./.github/actions/setup-macos-builder
uses: ./.github/actions/setup-macos-builder
- name: Run tests (excluding doctests)
shell: bash
run: |
cargo test --lib --tests --bins --features avro,json,backtrace
cd datafusion-cli
cargo test --lib --tests --bins --all-features
cargo test --lib --tests --bins --all-features

macos-aarch64:
name: cargo test (macos-aarch64)
if: (github.event_name == 'push') ||
(github.event_name == 'pull_request' &&
github.event.pull_request.head.repo.full_name != github.repository) ||
(github.event_name == 'workflow_dispatch')
runs-on: macos-14
steps:
- uses: actions/checkout@v4
Expand All @@ -366,6 +411,10 @@ jobs:

test-datafusion-pyarrow:
name: cargo test pyarrow (amd64)
if: (github.event_name == 'push') ||
(github.event_name == 'pull_request' &&
github.event.pull_request.head.repo.full_name != github.repository) ||
(github.event_name == 'workflow_dispatch')
needs: [ linux-build-lib ]
runs-on: ubuntu-20.04
container:
Expand All @@ -390,6 +439,10 @@ jobs:

vendor:
name: Verify Vendored Code
if: (github.event_name == 'push') ||
(github.event_name == 'pull_request' &&
github.event.pull_request.head.repo.full_name != github.repository) ||
(github.event_name == 'workflow_dispatch')
runs-on: ubuntu-latest
container:
image: amd64/rust
Expand All @@ -405,6 +458,10 @@ jobs:

check-fmt:
name: Check cargo fmt
if: (github.event_name == 'push') ||
(github.event_name == 'pull_request' &&
github.event.pull_request.head.repo.full_name != github.repository) ||
(github.event_name == 'workflow_dispatch')
runs-on: ubuntu-latest
container:
image: amd64/rust
Expand Down Expand Up @@ -463,6 +520,10 @@ jobs:

clippy:
name: clippy
if: (github.event_name == 'push') ||
(github.event_name == 'pull_request' &&
github.event.pull_request.head.repo.full_name != github.repository) ||
(github.event_name == 'workflow_dispatch')
needs: [ linux-build-lib ]
runs-on: ubuntu-latest
container:
Expand All @@ -483,6 +544,10 @@ jobs:
# Check answers are correct when hash values collide
hash-collisions:
name: cargo test hash collisions (amd64)
if: (github.event_name == 'push') ||
(github.event_name == 'pull_request' &&
github.event.pull_request.head.repo.full_name != github.repository) ||
(github.event_name == 'workflow_dispatch')
needs: [ linux-build-lib ]
runs-on: ubuntu-latest
container:
Expand All @@ -502,6 +567,10 @@ jobs:

cargo-toml-formatting-checks:
name: check Cargo.toml formatting
if: (github.event_name == 'push') ||
(github.event_name == 'pull_request' &&
github.event.pull_request.head.repo.full_name != github.repository) ||
(github.event_name == 'workflow_dispatch')
needs: [ linux-build-lib ]
runs-on: ubuntu-latest
container:
Expand All @@ -522,6 +591,10 @@ jobs:

config-docs-check:
name: check configs.md is up-to-date
if: (github.event_name == 'push') ||
(github.event_name == 'pull_request' &&
github.event.pull_request.head.repo.full_name != github.repository) ||
(github.event_name == 'workflow_dispatch')
needs: [ linux-build-lib ]
runs-on: ubuntu-latest
container:
Expand Down Expand Up @@ -550,6 +623,10 @@ jobs:
# - datafusion-cli
msrv:
name: Verify MSRV (Min Supported Rust Version)
if: (github.event_name == 'push') ||
(github.event_name == 'pull_request' &&
github.event.pull_request.head.repo.full_name != github.repository) ||
(github.event_name == 'workflow_dispatch')
runs-on: ubuntu-latest
container:
image: amd64/rust
Expand All @@ -567,19 +644,19 @@ jobs:
# (Min Supported Rust Version) than the one specified in the
# `rust-version` key of `Cargo.toml`.
#
# To reproduce:
# 1. Install the version of Rust that is failing. Example:
# To reproduce:
# 1. Install the version of Rust that is failing. Example:
# rustup install 1.76.0
# 2. Run the command that failed with that version. Example:
# cargo +1.76.0 check -p datafusion
#
#
# To resolve, either:
# 1. Change your code to use older Rust features,
# 1. Change your code to use older Rust features,
# 2. Revert dependency update
# 3. Update the MSRV version in `Cargo.toml`
#
# Please see the DataFusion Rust Version Compatibility Policy before
# updating Cargo.toml. You may have to update the code instead.
# updating Cargo.toml. You may have to update the code instead.
# https://github.com/apache/datafusion/blob/main/README.md#rust-version-compatibility-policy
cargo msrv --output-format json --log-target stdout verify
- name: Check datafusion-substrait
Expand All @@ -590,4 +667,4 @@ jobs:
run: cargo msrv --output-format json --log-target stdout verify
- name: Check datafusion-cli
working-directory: datafusion-cli
run: cargo msrv --output-format json --log-target stdout verify
run: cargo msrv --output-format json --log-target stdout verify
5 changes: 3 additions & 2 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,12 @@ impl RunOpt {
displayable(physical_plan.as_ref()).indent(true)
);
}
let result = collect(physical_plan.clone(), state.task_ctx()).await?;
let task_ctx = state.task_ctx();
let result = collect(physical_plan.clone(), Arc::clone(&task_ctx)).await?;
if debug {
println!(
"=== Physical plan with metrics ===\n{}\n",
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref())
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref(), task_ctx)
.indent(true)
);
if !result.is_empty() {
Expand Down
1 change: 1 addition & 0 deletions datafusion-cli/tests/cli_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#![allow(unexpected_cfgs)]

use std::process::Command;

Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use datafusion::datasource::physical_plan::{
parquet::ParquetFileReaderFactory, FileMeta, FileScanConfig,
};
use datafusion::datasource::TableProvider;
use datafusion::execution::metrics::ExecutionPlanMetricsSet;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::parquet::arrow::arrow_reader::{
ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector,
Expand All @@ -39,7 +40,6 @@ use datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties}
use datafusion::parquet::schema::types::ColumnPath;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use datafusion_common::{
Expand Down
7 changes: 7 additions & 0 deletions datafusion-examples/examples/advanced_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ impl WindowUDFImpl for SmoothItUdf {
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(MyPartitionEvaluator::new()))
}

fn resolve_placeholders(
&self,
_param_values: &Option<datafusion_common::ParamValues>,
) -> Result<Option<std::sync::Arc<dyn WindowUDFImpl>>> {
Ok(None)
}
}

/// This implements the lowest level evaluation for a window function
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/csv_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion::{
physical_plan::{CsvConfig, CsvOpener, FileScanConfig, FileStream},
},
error::Result,
physical_plan::metrics::ExecutionPlanMetricsSet,
execution::metrics::ExecutionPlanMetricsSet,
test_util::aggr_test_schema,
};

Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion::{
physical_plan::{FileScanConfig, FileStream, JsonOpener},
},
error::Result,
physical_plan::metrics::ExecutionPlanMetricsSet,
execution::metrics::ExecutionPlanMetricsSet,
};

use futures::StreamExt;
Expand Down
22 changes: 13 additions & 9 deletions datafusion-examples/examples/parquet_exec_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{ListingOptions, PartitionedFile};
use datafusion::datasource::physical_plan::ParquetExec;
use datafusion::execution::context::SessionContext;
use datafusion::physical_plan::metrics::MetricValue;
use datafusion::execution::metrics::MetricValue;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::{
execute_stream, visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor,
};
Expand Down Expand Up @@ -52,20 +53,22 @@ async fn main() {
let df = ctx.sql("SELECT * FROM my_table").await.unwrap();
let plan = df.create_physical_plan().await.unwrap();

// Create empty visitor
let mut visitor = ParquetExecVisitor {
file_groups: None,
bytes_scanned: None,
};

// Make sure you execute the plan to collect actual execution statistics.
// For example, in this example the `file_scan_config` is known without executing
// but the `bytes_scanned` would be None if we did not execute.
let mut batch_stream = execute_stream(plan.clone(), ctx.task_ctx()).unwrap();
let task_ctx = ctx.task_ctx();
let mut batch_stream = execute_stream(plan.clone(), Arc::clone(&task_ctx)).unwrap();
while let Some(batch) = batch_stream.next().await {
println!("Batch rows: {}", batch.unwrap().num_rows());
}

// Create empty visitor
let mut visitor = ParquetExecVisitor {
file_groups: None,
bytes_scanned: None,
ctx: task_ctx,
};

visit_execution_plan(plan.as_ref(), &mut visitor).unwrap();

println!(
Expand All @@ -85,6 +88,7 @@ async fn main() {
struct ParquetExecVisitor {
file_groups: Option<Vec<Vec<PartitionedFile>>>,
bytes_scanned: Option<MetricValue>,
ctx: Arc<TaskContext>,
}

impl ExecutionPlanVisitor for ParquetExecVisitor {
Expand All @@ -99,7 +103,7 @@ impl ExecutionPlanVisitor for ParquetExecVisitor {
if let Some(parquet_exec) = maybe_parquet_exec {
self.file_groups = Some(parquet_exec.base_config().file_groups.clone());

let metrics = match parquet_exec.metrics() {
let metrics = match self.ctx.plan_metrics(plan.as_any()) {
None => return Ok(true),
Some(metrics) => metrics,
};
Expand Down
Loading