Skip to content

Commit 5c05b6f

Browse files
committed
Fixes #27
This adds methods and which effectively perform collect under the hood. works also, but should be casted to KotlinArray which is not too beautiful and we can't override it (cause Kotlin forbids existing method overloading) Signed-off-by: Pasha Finkelshteyn <asm0dey@jetbrains.com>
1 parent ffdb41d commit 5c05b6f

File tree

4 files changed

+61
-2
lines changed

4 files changed

+61
-2
lines changed

core/src/main/scala/org/jetbrains/spark/extensions/KSparkExtensions.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,23 @@
1919
*/
2020
package org.jetbrains.spark.extensions
2121

22+
import java.util
23+
2224
import org.apache.spark.SparkContext
2325
import org.apache.spark.sql._
2426

27+
import scala.collection.JavaConverters
28+
2529
object KSparkExtensions {
2630
def col(d: Dataset[_], name: String): Column = d.col(name)
2731

2832
def col(name: String): Column = functions.col(name)
2933

3034
def lit(literal: Any): Column = functions.lit(literal)
3135

36+
def collectAsList[T](ds: Dataset[T]): util.List[T] = JavaConverters.seqAsJavaList(ds.collect())
37+
38+
3239
def debugCodegen(df: Dataset[_]): Unit = {
3340
import org.apache.spark.sql.execution.debug._
3441
df.debugCodegen()
@@ -39,5 +46,5 @@ object KSparkExtensions {
3946
df.debug()
4047
}
4148

42-
def sparkContext(s:SparkSession): SparkContext = s.sparkContext
49+
def sparkContext(s: SparkSession): SparkContext = s.sparkContext
4350
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*-
2+
* =LICENSE=
3+
* Kotlin Spark API: Examples
4+
* ----------
5+
* Copyright (C) 2019 - 2020 JetBrains
6+
* ----------
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
* =LICENSEEND=
19+
*/
20+
package org.jetbrains.spark.api.examples
21+
22+
import org.apache.spark.sql.Row
23+
import org.jetbrains.spark.api.*
24+
25+
fun main() {
26+
withSpark {
27+
val sd = dsOf(1, 2, 3)
28+
sd.createOrReplaceTempView("ds")
29+
spark.sql("select * from ds")
30+
.withCached {
31+
println("asList: ${toList<Int>()}")
32+
println("asArray: ${toArray<Int>().contentToString()}")
33+
this
34+
}
35+
.to<Int>()
36+
.withCached {
37+
println("typed collect: " + (collect() as Array<Int>).contentToString())
38+
println("type collectAsList: " + collectAsList())
39+
}
40+
41+
dsOf(1, 2, 3)
42+
.map { c(it, it + 1, it + 2) }
43+
.to<Row>()
44+
.select("_1")
45+
.collectAsList()
46+
.forEach { println(it) }
47+
}
48+
}

kotlin-spark-api/src/main/kotlin/org/jetbrains/spark/api/ApiV1.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,8 @@ inline fun <reified KEY, reified VALUE> KeyValueGroupedDataset<KEY, VALUE>.reduc
135135
.map { t -> t._1 to t._2 }
136136

137137
inline fun <T, reified R> Dataset<T>.downcast(): Dataset<R> = `as`(encoder<R>())
138+
inline fun <reified R> Dataset<*>.`as`(): Dataset<R> = `as`(encoder<R>())
139+
inline fun <reified R> Dataset<*>.to(): Dataset<R> = `as`(encoder<R>())
138140

139141
inline fun <reified T> Dataset<T>.forEach(noinline func: (T) -> Unit) = foreach(ForeachFunction(func))
140142

@@ -245,6 +247,9 @@ inline fun <reified T, R> Dataset<T>.withCached(blockingUnpersist: Boolean = fal
245247
return cached.executeOnCached().also { cached.unpersist(blockingUnpersist) }
246248
}
247249

250+
inline fun <reified T> Dataset<Row>.toList() = KSparkExtensions.collectAsList(to<T>())
251+
inline fun <reified R> Dataset<*>.toArray(): Array<R> = to<R>().collect() as Array<R>
252+
248253
/**
249254
* Alternative to [Dataset.show] which returns surce dataset.
250255
* Useful in debug purposes when you need to view contant of dataset as intermediate operation

kotlin-spark-api/src/main/kotlin/org/jetbrains/spark/api/SparkHelper.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
*/
2020
package org.jetbrains.spark.api
2121

22-
import org.apache.spark.sql.SparkSession
2322
import org.jetbrains.spark.api.SparkLogLevel.ERROR
2423

2524
/**

0 commit comments

Comments
 (0)