Skip to content

Commit cfd7446

Browse files
committed
Spark: Implement SupportsPushDownTopN
1 parent 73bf76b commit cfd7446

File tree

4 files changed

+33
-3
lines changed

4 files changed

+33
-3
lines changed

spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ExprUtils.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,15 +90,28 @@ object ExprUtils extends SQLConfHelper {
9090
case unsupported => throw CHClientException(s"Unsupported ClickHouse expression: $unsupported")
9191
}
9292

93-
def toClickHouse(transform: Transform): Expr = transform match {
93+
def toClickHouseOpt(v2Expr: V2Expression): Option[Expr] = Try(toClickHouse(v2Expr)).toOption
94+
95+
def toClickHouse(v2Expr: V2Expression): Expr = v2Expr match {
96+
// sort order
97+
case sortOrder: SortOrder =>
98+
val asc = sortOrder.direction == SortDirection.ASCENDING
99+
val nullFirst = sortOrder.nullOrdering == NullOrdering.NULLS_FIRST
100+
OrderExpr(toClickHouse(sortOrder.expression), asc, nullFirst)
101+
// transform
94102
case YearsTransform(FieldReference(Seq(col))) => FuncExpr("toYear", List(FieldRef(col)))
95103
case MonthsTransform(FieldReference(Seq(col))) => FuncExpr("toYYYYMM", List(FieldRef(col)))
96104
case DaysTransform(FieldReference(Seq(col))) => FuncExpr("toYYYYMMDD", List(FieldRef(col)))
97105
case HoursTransform(FieldReference(Seq(col))) => FuncExpr("toHour", List(FieldRef(col)))
98106
case IdentityTransform(fieldRefs) => FieldRef(fieldRefs.describe)
99107
case ApplyTransform(name, args) => FuncExpr(name, args.map(arg => SQLExpr(arg.describe())).toList)
100108
case bucket: BucketTransform => throw CHClientException(s"Bucket transform not support yet: $bucket")
101-
case other: Transform => throw CHClientException(s"Unsupported transform: $other")
109+
// others
110+
case l: Literal[_] => SQLExpr(l.toString)
111+
case FieldReference(Seq(col)) => FieldRef(col)
112+
case gse: GeneralScalarExpression => SQLExpr(gse.toString) // TODO: excluding unsupported
113+
// unsupported
114+
case unsupported: V2Expression => throw CHClientException(s"Unsupported expression: $unsupported")
102115
}
103116

104117
def inferTransformSchema(

spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/read/ClickHouseRead.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ package xenon.clickhouse.read
1616

1717
import org.apache.spark.sql.catalyst.InternalRow
1818
import org.apache.spark.sql.clickhouse.ClickHouseSQLConf._
19-
import org.apache.spark.sql.connector.expressions.Transform
19+
import org.apache.spark.sql.clickhouse.ExprUtils
2020
import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
21+
import org.apache.spark.sql.connector.expressions.{SortOrder, Transform}
2122
import org.apache.spark.sql.connector.metric.CustomMetric
2223
import org.apache.spark.sql.connector.read._
2324
import org.apache.spark.sql.connector.read.partitioning.{Partitioning, UnknownPartitioning}
@@ -38,6 +39,7 @@ class ClickHouseScanBuilder(
3839
metadataSchema: StructType,
3940
partitionTransforms: Array[Transform]
4041
) extends ScanBuilder
42+
with SupportsPushDownTopN
4143
with SupportsPushDownLimit
4244
with SupportsPushDownFilters
4345
with SupportsPushDownAggregates
@@ -56,13 +58,25 @@ class ClickHouseScanBuilder(
5658
physicalSchema.fields ++ reservedMetadataSchema.fields
5759
)
5860

61+
private var _orders: Option[String] = None
62+
5963
private var _limit: Option[Int] = None
6064

6165
override def pushLimit(limit: Int): Boolean = {
6266
this._limit = Some(limit)
6367
true
6468
}
6569

70+
override def pushTopN(orders: Array[SortOrder], limit: Int): Boolean = {
71+
val translated = orders.map(sortOrder => ExprUtils.toClickHouseOpt(sortOrder))
72+
if (translated.exists(_.isEmpty)) {
73+
return false
74+
}
75+
this._orders = Some(translated.flatten.mkString(" "))
76+
this._limit = Some(limit)
77+
true
78+
}
79+
6680
private var _pushedFilters = Array.empty[Filter]
6781

6882
override def pushedFilters: Array[Filter] = this._pushedFilters
@@ -121,6 +135,7 @@ class ClickHouseScanBuilder(
121135
readSchema = _readSchema,
122136
filtersExpr = compileFilters(AlwaysTrue :: pushedFilters.toList),
123137
groupByClause = _groupByClause,
138+
orderByClause = _orders.map(_.mkString("ORDER BY", " ", "")),
124139
limit = _limit
125140
))
126141
}

spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/read/ClickHouseReader.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ abstract class ClickHouseReader[Record](
5858
|FROM `$database`.`$table`
5959
|WHERE (${part.partFilterExpr}) AND (${scanJob.filtersExpr})
6060
|${scanJob.groupByClause.getOrElse("")}
61+
|${scanJob.orderByClause.getOrElse("")}
6162
|${scanJob.limit.map(n => s"LIMIT $n").getOrElse("")}
6263
|""".stripMargin
6364
}

spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/read/ScanJobDescription.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ case class ScanJobDescription(
3535
// into Scan tasks because the check happens in planing phase on driver side.
3636
filtersExpr: String = "1=1",
3737
groupByClause: Option[String] = None,
38+
orderByClause: Option[String] = None,
3839
limit: Option[Int] = None
3940
) {
4041

0 commit comments

Comments
 (0)