Skip to content

Commit 096d55f

Browse files
committed
chore(cubestore): Upgrade DF: Deduplicate and simplify SessionContext creation
1 parent fbf9ae8 commit 096d55f

File tree

6 files changed

+55
-84
lines changed

6 files changed

+55
-84
lines changed

rust/cubestore/cubestore/src/queryplanner/mod.rs

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ pub mod optimizations;
33
pub mod panic;
44
mod partition_filter;
55
mod planning;
6+
use datafusion::execution::runtime_env::RuntimeEnv;
67
use datafusion::logical_expr::planner::ExprPlanner;
78
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
89
use datafusion_datasource::memory::MemorySourceConfig;
@@ -19,7 +20,6 @@ mod topk;
1920
pub mod trace_data_loaded;
2021
use serialized_plan::PreSerializedPlan;
2122
pub use topk::MIN_TOPK_STREAM_ROWS;
22-
use udfs::{registerable_aggregate_udfs, registerable_scalar_udfs};
2323
mod filter_by_key_range;
2424
pub mod info_schema;
2525
pub mod merge_sort;
@@ -56,6 +56,7 @@ use crate::queryplanner::topk::ClusterAggregateTopKLower;
5656
use crate::queryplanner::metadata_cache::MetadataCacheFactory;
5757
use crate::queryplanner::optimizations::rolling_optimizer::RollingOptimizerRule;
5858
use crate::queryplanner::pretty_printers::{pp_plan_ext, PPOptions};
59+
use crate::queryplanner::udfs::{registerable_aggregate_udfs_iter, registerable_scalar_udfs_iter};
5960
use crate::sql::cache::SqlResultCache;
6061
use crate::sql::InlineTables;
6162
use crate::store::DataFrame;
@@ -72,7 +73,7 @@ use datafusion::common::{plan_datafusion_err, TableReference};
7273
use datafusion::config::ConfigOptions;
7374
use datafusion::datasource::{provider_as_source, TableType};
7475
use datafusion::error::DataFusionError;
75-
use datafusion::execution::{SessionState, TaskContext};
76+
use datafusion::execution::{SessionState, SessionStateBuilder, TaskContext};
7677
use datafusion::logical_expr::{
7778
AggregateUDF, Expr, Extension, LogicalPlan, ScalarUDF, TableProviderFilterPushDown,
7879
TableSource, WindowUDF,
@@ -280,22 +281,39 @@ impl QueryPlannerImpl {
280281
}
281282

282283
impl QueryPlannerImpl {
284+
/// Has the user defined functions to define query language behavior, but might exclude Cube
285+
/// optimizer rules or other parameters affecting execution performance. This is used by
286+
/// `QueryPlannerImpl::make_execution_context`.
287+
pub fn minimal_session_state_from_final_config(config: SessionConfig) -> SessionStateBuilder {
288+
let mut state_builder = SessionStateBuilder::new()
289+
.with_config(config)
290+
.with_runtime_env(Arc::new(RuntimeEnv::default()))
291+
.with_default_features();
292+
state_builder
293+
.aggregate_functions()
294+
.get_or_insert_default()
295+
.extend(registerable_aggregate_udfs_iter().map(Arc::new));
296+
state_builder
297+
.scalar_functions()
298+
.get_or_insert_default()
299+
.extend(registerable_scalar_udfs_iter().map(Arc::new));
300+
state_builder
301+
}
302+
283303
pub fn make_execution_context(mut config: SessionConfig) -> SessionContext {
284304
// The config parameter is from metadata_cache_factory (which we need to rename) but doesn't
285305
// include all necessary configs.
286306
config
287307
.options_mut()
288308
.execution
289309
.dont_parallelize_sort_preserving_merge_exec_inputs = true;
290-
let context = SessionContext::new_with_config(config);
310+
291311
// TODO upgrade DF: build SessionContexts consistently
292-
for udaf in registerable_aggregate_udfs() {
293-
context.register_udaf(udaf);
294-
}
295-
for udf in registerable_scalar_udfs() {
296-
context.register_udf(udf);
297-
}
298-
context.add_optimizer_rule(Arc::new(RollingOptimizerRule {}));
312+
let state = Self::minimal_session_state_from_final_config(config)
313+
.with_optimizer_rule(Arc::new(RollingOptimizerRule {}))
314+
.build();
315+
316+
let context = SessionContext::new_with_state(state);
299317

300318
// TODO upgrade DF
301319
// context

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,7 @@ use datafusion::datasource::physical_plan::{
4646
use datafusion::datasource::{TableProvider, TableType};
4747
use datafusion::error::DataFusionError;
4848
use datafusion::error::Result as DFResult;
49-
use datafusion::execution::runtime_env::RuntimeEnv;
50-
use datafusion::execution::{SessionStateBuilder, TaskContext};
49+
use datafusion::execution::TaskContext;
5150
use datafusion::logical_expr::{Expr, LogicalPlan};
5251
use datafusion::physical_expr;
5352
use datafusion::physical_expr::LexOrdering;
@@ -96,7 +95,6 @@ use std::time::SystemTime;
9695
use tracing::{instrument, Instrument};
9796

9897
use super::serialized_plan::PreSerializedPlan;
99-
use super::udfs::{registerable_arc_aggregate_udfs, registerable_arc_scalar_udfs};
10098
use super::{try_make_memory_data_source, QueryPlannerImpl};
10199

102100
#[automock]
@@ -444,15 +442,9 @@ impl QueryExecutorImpl {
444442
&self,
445443
query_planner: CubeQueryPlanner,
446444
) -> Result<Arc<SessionContext>, CubeError> {
447-
let runtime = Arc::new(RuntimeEnv::default());
448445
let config = self.session_config();
449-
let session_state = SessionStateBuilder::new()
450-
.with_config(config)
451-
.with_runtime_env(runtime)
452-
.with_default_features()
446+
let session_state = QueryPlannerImpl::minimal_session_state_from_final_config(config)
453447
.with_query_planner(Arc::new(query_planner))
454-
.with_aggregate_functions(registerable_arc_aggregate_udfs())
455-
.with_scalar_functions(registerable_arc_scalar_udfs())
456448
.with_physical_optimizer_rules(self.physical_optimizer_rules())
457449
.build();
458450
let ctx = SessionContext::new_with_state(session_state);

rust/cubestore/cubestore/src/queryplanner/serialized_plan.rs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use super::udfs::{registerable_aggregate_udfs, registerable_scalar_udfs};
21
use crate::metastore::table::{Table, TablePath};
32
use crate::metastore::{Chunk, IdRow, Index, Partition};
43
use crate::queryplanner::panic::PanicWorkerNode;
@@ -7,7 +6,9 @@ use crate::queryplanner::providers::InfoSchemaQueryCacheTableProvider;
76
use crate::queryplanner::query_executor::{CubeTable, InlineTableId, InlineTableProvider};
87
use crate::queryplanner::rolling::RollingWindowAggregate;
98
use crate::queryplanner::topk::{ClusterAggregateTopKLower, ClusterAggregateTopKUpper};
10-
use crate::queryplanner::{pretty_printers, CubeTableLogical, InfoSchemaTableProvider};
9+
use crate::queryplanner::{
10+
pretty_printers, CubeTableLogical, InfoSchemaTableProvider, QueryPlannerImpl,
11+
};
1112
use crate::table::Row;
1213
use crate::CubeError;
1314
use datafusion::arrow::datatypes::SchemaRef;
@@ -27,7 +28,7 @@ use datafusion::logical_expr::{
2728
Projection, RecursiveQuery, Repartition, Sort, Subquery, SubqueryAlias, TableScan, Union,
2829
Unnest, Values, Window,
2930
};
30-
use datafusion::prelude::SessionContext;
31+
use datafusion::prelude::{SessionConfig, SessionContext};
3132
use datafusion_proto::bytes::logical_plan_from_bytes_with_extension_codec;
3233
use datafusion_proto::logical_plan::LogicalExtensionCodec;
3334
use std::collections::HashMap;
@@ -1037,16 +1038,13 @@ impl SerializedPlan {
10371038
chunk_id_to_record_batches: HashMap<u64, Vec<RecordBatch>>,
10381039
parquet_metadata_cache: Arc<dyn ParquetFileReaderFactory>,
10391040
) -> Result<LogicalPlan, CubeError> {
1040-
// TODO DF upgrade SessionContext::new()
1041-
// After this comment was made, we now register_udaf... what else?
1042-
let session_context = SessionContext::new();
1043-
// TODO DF upgrade: consistently build SessionContexts/register udafs/udfs.
1044-
for udaf in registerable_aggregate_udfs() {
1045-
session_context.register_udaf(udaf);
1046-
}
1047-
for udf in registerable_scalar_udfs() {
1048-
session_context.register_udf(udf);
1049-
}
1041+
// TODO upgrade DF: We might avoid constructing so many one-time-use SessionContexts.
1042+
1043+
// We need registered Cube UDFs and UDAFs (and there are no UDWFs) to deserialize the plan,
1044+
// but not much else.
1045+
let session_context = SessionContext::new_with_state(
1046+
QueryPlannerImpl::minimal_session_state_from_final_config(SessionConfig::new()).build(),
1047+
);
10501048

10511049
let logical_plan = logical_plan_from_bytes_with_extension_codec(
10521050
self.logical_plan.as_slice(),

rust/cubestore/cubestore/src/queryplanner/topk/plan.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::queryplanner::topk::execute::{AggregateTopKExec, TopKAggregateFunctio
33
use crate::queryplanner::topk::{
44
ClusterAggregateTopKLower, ClusterAggregateTopKUpper, SortColumn, MIN_TOPK_STREAM_ROWS,
55
};
6-
use crate::queryplanner::udfs::{scalar_udf_by_kind, CubeScalarUDFKind, HllCardinality};
6+
use crate::queryplanner::udfs::HllCardinality;
77
use datafusion::arrow::compute::SortOptions;
88
use datafusion::arrow::datatypes::{DataType, Schema};
99
use datafusion::common::tree_node::{Transformed, TreeNode};
@@ -21,7 +21,8 @@ use datafusion::physical_plan::{ExecutionPlan, PhysicalExpr};
2121

2222
use datafusion::common::{DFSchema, DFSchemaRef, Spans};
2323
use datafusion::logical_expr::{
24-
Aggregate, Extension, FetchType, Filter, Limit, LogicalPlan, Projection, SkipType, SortExpr,
24+
Aggregate, Extension, FetchType, Filter, Limit, LogicalPlan, Projection, ScalarUDF, SkipType,
25+
SortExpr,
2526
};
2627
use datafusion::physical_planner::{create_aggregate_expr_and_maybe_filter, PhysicalPlanner};
2728
use datafusion::prelude::Expr;
@@ -678,7 +679,7 @@ pub fn make_sort_expr(
678679
// schema in create_physical_expr.
679680
match fun {
680681
TopKAggregateFunction::Merge => {
681-
let udf = scalar_udf_by_kind(CubeScalarUDFKind::HllCardinality);
682+
let udf = Arc::new(ScalarUDF::new_from_impl(HllCardinality::new()));
682683
Arc::new(ScalarFunctionExpr::new(
683684
HllCardinality::static_name(),
684685
udf,

rust/cubestore/cubestore/src/queryplanner/udfs.rs

Lines changed: 7 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -21,45 +21,16 @@ use serde_derive::{Deserialize, Serialize};
2121
use std::any::Any;
2222
use std::sync::Arc;
2323

24-
#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
25-
pub enum CubeScalarUDFKind {
26-
HllCardinality, // cardinality(), accepting the HyperLogLog sketches.
27-
UnixTimestamp,
28-
DateAdd,
29-
DateSub,
30-
DateBin,
31-
ConvertTz,
32-
}
33-
34-
pub fn scalar_udf_by_kind(k: CubeScalarUDFKind) -> Arc<ScalarUDF> {
35-
match k {
36-
CubeScalarUDFKind::HllCardinality => Arc::new(HllCardinality::descriptor()),
37-
CubeScalarUDFKind::UnixTimestamp => {
38-
Arc::new(ScalarUDF::new_from_impl(UnixTimestamp::new()))
39-
}
40-
CubeScalarUDFKind::DateAdd => Arc::new(ScalarUDF::new_from_impl(DateAddSub::new_add())),
41-
CubeScalarUDFKind::DateSub => Arc::new(ScalarUDF::new_from_impl(DateAddSub::new_sub())),
42-
CubeScalarUDFKind::DateBin => Arc::new(ScalarUDF::new_from_impl(DateBin::new())),
43-
CubeScalarUDFKind::ConvertTz => Arc::new(ScalarUDF::new_from_impl(ConvertTz::new())),
44-
}
45-
}
46-
47-
pub fn registerable_scalar_udfs() -> Vec<ScalarUDF> {
48-
vec![
49-
HllCardinality::descriptor(),
24+
pub fn registerable_scalar_udfs_iter() -> impl Iterator<Item = ScalarUDF> {
25+
[
26+
ScalarUDF::new_from_impl(HllCardinality::new()),
5027
ScalarUDF::new_from_impl(DateBin::new()),
5128
ScalarUDF::new_from_impl(DateAddSub::new_add()),
5229
ScalarUDF::new_from_impl(DateAddSub::new_sub()),
5330
ScalarUDF::new_from_impl(UnixTimestamp::new()),
5431
ScalarUDF::new_from_impl(ConvertTz::new()),
5532
]
56-
}
57-
58-
pub fn registerable_arc_scalar_udfs() -> Vec<Arc<ScalarUDF>> {
59-
registerable_scalar_udfs()
60-
.into_iter()
61-
.map(Arc::new)
62-
.collect()
33+
.into_iter()
6334
}
6435

6536
#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
@@ -68,18 +39,12 @@ pub enum CubeAggregateUDFKind {
6839
Xirr,
6940
}
7041

71-
pub fn registerable_aggregate_udfs() -> Vec<AggregateUDF> {
72-
vec![
42+
pub fn registerable_aggregate_udfs_iter() -> impl Iterator<Item = AggregateUDF> {
43+
[
7344
AggregateUDF::new_from_impl(HllMergeUDF::new()),
7445
AggregateUDF::new_from_impl(XirrUDF::new()),
7546
]
76-
}
77-
78-
pub fn registerable_arc_aggregate_udfs() -> Vec<Arc<AggregateUDF>> {
79-
registerable_aggregate_udfs()
80-
.into_iter()
81-
.map(Arc::new)
82-
.collect()
47+
.into_iter()
8348
}
8449

8550
pub fn aggregate_udf_by_kind(k: CubeAggregateUDFKind) -> AggregateUDF {
@@ -586,9 +551,6 @@ impl HllCardinality {
586551

587552
HllCardinality { signature }
588553
}
589-
fn descriptor() -> ScalarUDF {
590-
return ScalarUDF::new_from_impl(HllCardinality::new());
591-
}
592554

593555
/// Lets us call [`ScalarFunctionExpr::new`] in some cases without elaborately computing return
594556
/// type or using [`ScalarFunctionExpr::try_new`].

rust/cubestore/cubestore/src/streaming/topic_table_provider.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::metastore::Column;
2-
use crate::queryplanner::udfs::{registerable_arc_aggregate_udfs, registerable_arc_scalar_udfs};
2+
use crate::queryplanner::udfs::{registerable_aggregate_udfs_iter, registerable_scalar_udfs_iter};
33
use crate::CubeError;
44
use async_trait::async_trait;
55
use chrono::{TimeZone, Utc};
@@ -47,7 +47,7 @@ impl TopicTableProvider {
4747
.collect::<Vec<Field>>(),
4848
));
4949
let mut udfs = SessionStateDefaults::default_scalar_functions();
50-
udfs.append(&mut registerable_arc_scalar_udfs());
50+
udfs.extend(registerable_scalar_udfs_iter().map(Arc::new));
5151
udfs.push(Arc::new(
5252
ScalarUDF::new_from_impl(ParseTimestampFunc::new()),
5353
));
@@ -62,7 +62,7 @@ impl TopicTableProvider {
6262
.collect();
6363

6464
let mut udafs = SessionStateDefaults::default_aggregate_functions();
65-
udafs.append(&mut registerable_arc_aggregate_udfs());
65+
udafs.extend(registerable_aggregate_udfs_iter().map(Arc::new));
6666

6767
let udafs = udafs
6868
.into_iter()

0 commit comments

Comments
 (0)