Skip to content

Commit 0551d50

Browse files
authored
Merge branch 'main' into fix-predicate-pushdown-bug
2 parents b6bab83 + 7002a00 commit 0551d50

File tree

147 files changed

+1280
-1014
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

147 files changed

+1280
-1014
lines changed

datafusion-examples/examples/advanced_udaf.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use arrow::array::{
2525
};
2626
use arrow::datatypes::{ArrowNativeTypeOp, ArrowPrimitiveType, Float64Type, UInt32Type};
2727
use arrow::record_batch::RecordBatch;
28+
use arrow_schema::FieldRef;
2829
use datafusion::common::{cast::as_float64_array, ScalarValue};
2930
use datafusion::error::Result;
3031
use datafusion::logical_expr::{
@@ -92,10 +93,10 @@ impl AggregateUDFImpl for GeoMeanUdaf {
9293
}
9394

9495
/// This is the description of the state. accumulator's state() must match the types here.
95-
fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
96+
fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
9697
Ok(vec![
97-
Field::new("prod", args.return_type().clone(), true),
98-
Field::new("n", DataType::UInt32, true),
98+
Field::new("prod", args.return_type().clone(), true).into(),
99+
Field::new("n", DataType::UInt32, true).into(),
99100
])
100101
}
101102

@@ -401,7 +402,7 @@ impl AggregateUDFImpl for SimplifiedGeoMeanUdaf {
401402
unimplemented!("should not be invoked")
402403
}
403404

404-
fn state_fields(&self, _args: StateFieldsArgs) -> Result<Vec<Field>> {
405+
fn state_fields(&self, _args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
405406
unimplemented!("should not be invoked")
406407
}
407408

datafusion-examples/examples/advanced_udwf.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use arrow::{
2323
array::{ArrayRef, AsArray, Float64Array},
2424
datatypes::Float64Type,
2525
};
26+
use arrow_schema::FieldRef;
2627
use datafusion::common::ScalarValue;
2728
use datafusion::error::Result;
2829
use datafusion::functions_aggregate::average::avg_udaf;
@@ -87,8 +88,8 @@ impl WindowUDFImpl for SmoothItUdf {
8788
Ok(Box::new(MyPartitionEvaluator::new()))
8889
}
8990

90-
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
91-
Ok(Field::new(field_args.name(), DataType::Float64, true))
91+
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
92+
Ok(Field::new(field_args.name(), DataType::Float64, true).into())
9293
}
9394
}
9495

@@ -205,8 +206,8 @@ impl WindowUDFImpl for SimplifySmoothItUdf {
205206
Some(Box::new(simplify))
206207
}
207208

208-
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
209-
Ok(Field::new(field_args.name(), DataType::Float64, true))
209+
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
210+
Ok(Field::new(field_args.name(), DataType::Float64, true).into())
210211
}
211212
}
212213

datafusion/core/tests/physical_optimizer/projection_pushdown.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ fn test_update_matching_exprs() -> Result<()> {
128128
Arc::new(Column::new("b", 1)),
129129
)),
130130
],
131-
Field::new("f", DataType::Int32, true),
131+
Field::new("f", DataType::Int32, true).into(),
132132
)),
133133
Arc::new(CaseExpr::try_new(
134134
Some(Arc::new(Column::new("d", 2))),
@@ -193,7 +193,7 @@ fn test_update_matching_exprs() -> Result<()> {
193193
Arc::new(Column::new("b", 1)),
194194
)),
195195
],
196-
Field::new("f", DataType::Int32, true),
196+
Field::new("f", DataType::Int32, true).into(),
197197
)),
198198
Arc::new(CaseExpr::try_new(
199199
Some(Arc::new(Column::new("d", 3))),
@@ -261,7 +261,7 @@ fn test_update_projected_exprs() -> Result<()> {
261261
Arc::new(Column::new("b", 1)),
262262
)),
263263
],
264-
Field::new("f", DataType::Int32, true),
264+
Field::new("f", DataType::Int32, true).into(),
265265
)),
266266
Arc::new(CaseExpr::try_new(
267267
Some(Arc::new(Column::new("d", 2))),
@@ -326,7 +326,7 @@ fn test_update_projected_exprs() -> Result<()> {
326326
Arc::new(Column::new("b_new", 1)),
327327
)),
328328
],
329-
Field::new("f", DataType::Int32, true),
329+
Field::new("f", DataType::Int32, true).into(),
330330
)),
331331
Arc::new(CaseExpr::try_new(
332332
Some(Arc::new(Column::new("d_new", 3))),

datafusion/core/tests/user_defined/user_defined_aggregates.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use arrow::array::{
3232
StringArray, StructArray, UInt64Array,
3333
};
3434
use arrow::datatypes::{Fields, Schema};
35+
use arrow_schema::FieldRef;
3536
use datafusion::common::test_util::batches_to_string;
3637
use datafusion::dataframe::DataFrame;
3738
use datafusion::datasource::MemTable;
@@ -572,7 +573,7 @@ impl TimeSum {
572573
// Returns the same type as its input
573574
let return_type = timestamp_type.clone();
574575

575-
let state_fields = vec![Field::new("sum", timestamp_type, true)];
576+
let state_fields = vec![Field::new("sum", timestamp_type, true).into()];
576577

577578
let volatility = Volatility::Immutable;
578579

@@ -672,7 +673,7 @@ impl FirstSelector {
672673
let state_fields = state_type
673674
.into_iter()
674675
.enumerate()
675-
.map(|(i, t)| Field::new(format!("{i}"), t, true))
676+
.map(|(i, t)| Field::new(format!("{i}"), t, true).into())
676677
.collect::<Vec<_>>();
677678

678679
// Possible input signatures
@@ -932,9 +933,10 @@ impl AggregateUDFImpl for MetadataBasedAggregateUdf {
932933
unimplemented!("this should never be called since return_field is implemented");
933934
}
934935

935-
fn return_field(&self, _arg_fields: &[Field]) -> Result<Field> {
936+
fn return_field(&self, _arg_fields: &[FieldRef]) -> Result<FieldRef> {
936937
Ok(Field::new(self.name(), DataType::UInt64, true)
937-
.with_metadata(self.metadata.clone()))
938+
.with_metadata(self.metadata.clone())
939+
.into())
938940
}
939941

940942
fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {

datafusion/core/tests/user_defined/user_defined_scalar_functions.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use arrow::array::{
2828
use arrow::compute::kernels::numeric::add;
2929
use arrow::datatypes::{DataType, Field, Schema};
3030
use arrow_schema::extension::{Bool8, CanonicalExtensionType, ExtensionType};
31-
use arrow_schema::ArrowError;
31+
use arrow_schema::{ArrowError, FieldRef};
3232
use datafusion::common::test_util::batches_to_string;
3333
use datafusion::execution::context::{FunctionFactory, RegisterFunction, SessionState};
3434
use datafusion::prelude::*;
@@ -814,7 +814,7 @@ impl ScalarUDFImpl for TakeUDF {
814814
///
815815
/// 1. If the third argument is '0', return the type of the first argument
816816
/// 2. If the third argument is '1', return the type of the second argument
817-
fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<Field> {
817+
fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
818818
if args.arg_fields.len() != 3 {
819819
return plan_err!("Expected 3 arguments, got {}.", args.arg_fields.len());
820820
}
@@ -845,7 +845,8 @@ impl ScalarUDFImpl for TakeUDF {
845845
self.name(),
846846
args.arg_fields[take_idx].data_type().to_owned(),
847847
true,
848-
))
848+
)
849+
.into())
849850
}
850851

851852
// The actual implementation
@@ -1412,9 +1413,10 @@ impl ScalarUDFImpl for MetadataBasedUdf {
14121413
);
14131414
}
14141415

1415-
fn return_field_from_args(&self, _args: ReturnFieldArgs) -> Result<Field> {
1416+
fn return_field_from_args(&self, _args: ReturnFieldArgs) -> Result<FieldRef> {
14161417
Ok(Field::new(self.name(), DataType::UInt64, true)
1417-
.with_metadata(self.metadata.clone()))
1418+
.with_metadata(self.metadata.clone())
1419+
.into())
14181420
}
14191421

14201422
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
@@ -1562,14 +1564,15 @@ impl ScalarUDFImpl for ExtensionBasedUdf {
15621564
Ok(DataType::Utf8)
15631565
}
15641566

1565-
fn return_field_from_args(&self, _args: ReturnFieldArgs) -> Result<Field> {
1567+
fn return_field_from_args(&self, _args: ReturnFieldArgs) -> Result<FieldRef> {
15661568
Ok(Field::new("canonical_extension_udf", DataType::Utf8, true)
1567-
.with_extension_type(MyUserExtentionType {}))
1569+
.with_extension_type(MyUserExtentionType {})
1570+
.into())
15681571
}
15691572

15701573
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
15711574
assert_eq!(args.arg_fields.len(), 1);
1572-
let input_field = args.arg_fields[0];
1575+
let input_field = args.arg_fields[0].as_ref();
15731576

15741577
let output_as_bool = matches!(
15751578
CanonicalExtensionType::try_from(input_field),

datafusion/core/tests/user_defined/user_defined_window_functions.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use arrow::array::{
2323
UInt64Array,
2424
};
2525
use arrow::datatypes::{DataType, Field, Schema};
26+
use arrow_schema::FieldRef;
2627
use datafusion::common::test_util::batches_to_string;
2728
use datafusion::common::{Result, ScalarValue};
2829
use datafusion::prelude::SessionContext;
@@ -564,8 +565,8 @@ impl OddCounter {
564565
&self.aliases
565566
}
566567

567-
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
568-
Ok(Field::new(field_args.name(), DataType::Int64, true))
568+
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
569+
Ok(Field::new(field_args.name(), DataType::Int64, true).into())
569570
}
570571
}
571572

@@ -683,7 +684,7 @@ impl WindowUDFImpl for VariadicWindowUDF {
683684
unimplemented!("unnecessary for testing");
684685
}
685686

686-
fn field(&self, _: WindowUDFFieldArgs) -> Result<Field> {
687+
fn field(&self, _: WindowUDFFieldArgs) -> Result<FieldRef> {
687688
unimplemented!("unnecessary for testing");
688689
}
689690
}
@@ -809,9 +810,10 @@ impl WindowUDFImpl for MetadataBasedWindowUdf {
809810
Ok(Box::new(MetadataBasedPartitionEvaluator { double_output }))
810811
}
811812

812-
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
813+
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
813814
Ok(Field::new(field_args.name(), DataType::UInt64, true)
814-
.with_metadata(self.metadata.clone()))
815+
.with_metadata(self.metadata.clone())
816+
.into())
815817
}
816818
}
817819

datafusion/expr-common/src/type_coercion/aggregates.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
use crate::signature::TypeSignature;
1919
use arrow::datatypes::{
20-
DataType, Field, TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE,
20+
DataType, FieldRef, TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE,
2121
DECIMAL256_MAX_PRECISION, DECIMAL256_MAX_SCALE,
2222
};
2323

@@ -89,7 +89,7 @@ pub static TIMES: &[DataType] = &[
8989
/// number of input types.
9090
pub fn check_arg_count(
9191
func_name: &str,
92-
input_fields: &[Field],
92+
input_fields: &[FieldRef],
9393
signature: &TypeSignature,
9494
) -> Result<()> {
9595
match signature {

datafusion/expr/src/expr.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use crate::logical_plan::Subquery;
2828
use crate::Volatility;
2929
use crate::{udaf, ExprSchemable, Operator, Signature, WindowFrame, WindowUDF};
3030

31-
use arrow::datatypes::{DataType, Field, FieldRef};
31+
use arrow::datatypes::{DataType, FieldRef};
3232
use datafusion_common::cse::{HashNode, NormalizeEq, Normalizeable};
3333
use datafusion_common::tree_node::{
3434
Transformed, TransformedResult, TreeNode, TreeNodeContainer, TreeNodeRecursion,
@@ -846,10 +846,10 @@ impl WindowFunctionDefinition {
846846
/// Returns the datatype of the window function
847847
pub fn return_field(
848848
&self,
849-
input_expr_fields: &[Field],
849+
input_expr_fields: &[FieldRef],
850850
_input_expr_nullable: &[bool],
851851
display_name: &str,
852-
) -> Result<Field> {
852+
) -> Result<FieldRef> {
853853
match self {
854854
WindowFunctionDefinition::AggregateUDF(fun) => {
855855
fun.return_field(input_expr_fields)

datafusion/expr/src/expr_fn.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use crate::{
3737
use arrow::compute::kernels::cast_utils::{
3838
parse_interval_day_time, parse_interval_month_day_nano, parse_interval_year_month,
3939
};
40-
use arrow::datatypes::{DataType, Field};
40+
use arrow::datatypes::{DataType, Field, FieldRef};
4141
use datafusion_common::{plan_err, Column, Result, ScalarValue, Spans, TableReference};
4242
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
4343
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
@@ -492,6 +492,7 @@ pub fn create_udaf(
492492
.into_iter()
493493
.enumerate()
494494
.map(|(i, t)| Field::new(format!("{i}"), t, true))
495+
.map(Arc::new)
495496
.collect::<Vec<_>>();
496497
AggregateUDF::from(SimpleAggregateUDF::new(
497498
name,
@@ -510,7 +511,7 @@ pub struct SimpleAggregateUDF {
510511
signature: Signature,
511512
return_type: DataType,
512513
accumulator: AccumulatorFactoryFunction,
513-
state_fields: Vec<Field>,
514+
state_fields: Vec<FieldRef>,
514515
}
515516

516517
impl Debug for SimpleAggregateUDF {
@@ -533,7 +534,7 @@ impl SimpleAggregateUDF {
533534
return_type: DataType,
534535
volatility: Volatility,
535536
accumulator: AccumulatorFactoryFunction,
536-
state_fields: Vec<Field>,
537+
state_fields: Vec<FieldRef>,
537538
) -> Self {
538539
let name = name.into();
539540
let signature = Signature::exact(input_type, volatility);
@@ -553,7 +554,7 @@ impl SimpleAggregateUDF {
553554
signature: Signature,
554555
return_type: DataType,
555556
accumulator: AccumulatorFactoryFunction,
556-
state_fields: Vec<Field>,
557+
state_fields: Vec<FieldRef>,
557558
) -> Self {
558559
let name = name.into();
559560
Self {
@@ -590,7 +591,7 @@ impl AggregateUDFImpl for SimpleAggregateUDF {
590591
(self.accumulator)(acc_args)
591592
}
592593

593-
fn state_fields(&self, _args: StateFieldsArgs) -> Result<Vec<Field>> {
594+
fn state_fields(&self, _args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
594595
Ok(self.state_fields.clone())
595596
}
596597
}
@@ -678,12 +679,12 @@ impl WindowUDFImpl for SimpleWindowUDF {
678679
(self.partition_evaluator_factory)()
679680
}
680681

681-
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
682-
Ok(Field::new(
682+
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
683+
Ok(Arc::new(Field::new(
683684
field_args.name(),
684685
self.return_type.clone(),
685686
true,
686-
))
687+
)))
687688
}
688689
}
689690

0 commit comments

Comments
 (0)