Skip to content

Commit 3092d4b

Browse files
authored
Merge pull request #157 from JetBrains/readme-streaming
Readme streaming (1.1.0 release)
2 parents 204ac2b + e379ea7 commit 3092d4b

File tree

6 files changed

+124
-86
lines changed

6 files changed

+124
-86
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ For more information, check the [wiki](https://github.com/JetBrains/kotlin-spark
271271

272272
## Examples
273273

274-
For more, check out [examples](https://github.com/JetBrains/kotlin-spark-api/tree/master/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples) module.
274+
For more, check out [examples](examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples) module.
275275
To get up and running quickly, check out this [tutorial](https://github.com/JetBrains/kotlin-spark-api/wiki/Quick-Start-Guide).
276276

277277
## Reporting issues/Support

jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/Integration.kt

Lines changed: 58 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ package org.jetbrains.kotlinx.spark.api.jupyter
2222
import org.apache.spark.api.java.JavaRDDLike
2323
import org.apache.spark.rdd.RDD
2424
import org.apache.spark.sql.Dataset
25+
import org.jetbrains.kotlinx.jupyter.api.FieldValue
2526
import org.jetbrains.kotlinx.jupyter.api.HTML
2627
import org.jetbrains.kotlinx.jupyter.api.KotlinKernelHost
2728
import org.jetbrains.kotlinx.jupyter.api.libraries.JupyterIntegration
@@ -33,50 +34,71 @@ abstract class Integration : JupyterIntegration() {
3334
private val scalaVersion = "2.12.15"
3435
private val spark3Version = "3.2.1"
3536

37+
/**
38+
* Will be run after importing all dependencies
39+
*/
3640
abstract fun KotlinKernelHost.onLoaded()
3741

38-
override fun Builder.onLoaded() {
42+
abstract fun KotlinKernelHost.onShutdown()
43+
44+
abstract fun KotlinKernelHost.afterCellExecution(snippetInstance: Any, result: FieldValue)
45+
46+
open val dependencies: Array<String> = arrayOf(
47+
"org.apache.spark:spark-repl_$scalaCompatVersion:$spark3Version",
48+
"org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlinVersion",
49+
"org.jetbrains.kotlin:kotlin-reflect:$kotlinVersion",
50+
"org.apache.spark:spark-sql_$scalaCompatVersion:$spark3Version",
51+
"org.apache.spark:spark-streaming_$scalaCompatVersion:$spark3Version",
52+
"org.apache.spark:spark-mllib_$scalaCompatVersion:$spark3Version",
53+
"org.apache.spark:spark-sql_$scalaCompatVersion:$spark3Version",
54+
"org.apache.spark:spark-graphx_$scalaCompatVersion:$spark3Version",
55+
"org.apache.spark:spark-launcher_$scalaCompatVersion:$spark3Version",
56+
"org.apache.spark:spark-catalyst_$scalaCompatVersion:$spark3Version",
57+
"org.apache.spark:spark-streaming_$scalaCompatVersion:$spark3Version",
58+
"org.apache.spark:spark-core_$scalaCompatVersion:$spark3Version",
59+
"org.scala-lang:scala-library:$scalaVersion",
60+
"org.scala-lang.modules:scala-xml_$scalaCompatVersion:2.0.1",
61+
"org.scala-lang:scala-reflect:$scalaVersion",
62+
"org.scala-lang:scala-compiler:$scalaVersion",
63+
"commons-io:commons-io:2.11.0",
64+
)
3965

40-
dependencies(
41-
"org.apache.spark:spark-repl_$scalaCompatVersion:$spark3Version",
42-
"org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlinVersion",
43-
"org.jetbrains.kotlin:kotlin-reflect:$kotlinVersion",
44-
"org.apache.spark:spark-sql_$scalaCompatVersion:$spark3Version",
45-
"org.apache.spark:spark-streaming_$scalaCompatVersion:$spark3Version",
46-
"org.apache.spark:spark-mllib_$scalaCompatVersion:$spark3Version",
47-
"org.apache.spark:spark-sql_$scalaCompatVersion:$spark3Version",
48-
"org.apache.spark:spark-graphx_$scalaCompatVersion:$spark3Version",
49-
"org.apache.spark:spark-launcher_$scalaCompatVersion:$spark3Version",
50-
"org.apache.spark:spark-catalyst_$scalaCompatVersion:$spark3Version",
51-
"org.apache.spark:spark-streaming_$scalaCompatVersion:$spark3Version",
52-
"org.apache.spark:spark-core_$scalaCompatVersion:$spark3Version",
53-
"org.scala-lang:scala-library:$scalaVersion",
54-
"org.scala-lang.modules:scala-xml_$scalaCompatVersion:2.0.1",
55-
"org.scala-lang:scala-reflect:$scalaVersion",
56-
"org.scala-lang:scala-compiler:$scalaVersion",
57-
"commons-io:commons-io:2.11.0",
58-
)
59-
60-
import(
61-
"org.jetbrains.kotlinx.spark.api.*",
62-
"org.jetbrains.kotlinx.spark.api.tuples.*",
63-
*(1..22).map { "scala.Tuple$it" }.toTypedArray(),
64-
"org.apache.spark.sql.functions.*",
65-
"org.apache.spark.*",
66-
"org.apache.spark.sql.*",
67-
"org.apache.spark.api.java.*",
68-
"scala.collection.Seq",
69-
"org.apache.spark.rdd.*",
70-
"java.io.Serializable",
71-
"org.apache.spark.streaming.api.java.*",
72-
"org.apache.spark.streaming.api.*",
73-
"org.apache.spark.streaming.*",
74-
)
66+
open val imports: Array<String> = arrayOf(
67+
"org.jetbrains.kotlinx.spark.api.*",
68+
"org.jetbrains.kotlinx.spark.api.tuples.*",
69+
*(1..22).map { "scala.Tuple$it" }.toTypedArray(),
70+
"org.apache.spark.sql.functions.*",
71+
"org.apache.spark.*",
72+
"org.apache.spark.sql.*",
73+
"org.apache.spark.api.java.*",
74+
"scala.collection.Seq",
75+
"org.apache.spark.rdd.*",
76+
"java.io.Serializable",
77+
"org.apache.spark.streaming.api.java.*",
78+
"org.apache.spark.streaming.api.*",
79+
"org.apache.spark.streaming.*",
80+
)
81+
82+
override fun Builder.onLoaded() {
83+
dependencies(*dependencies)
84+
import(*imports)
7585

7686
onLoaded {
7787
onLoaded()
7888
}
7989

90+
beforeCellExecution {
91+
execute("""scala.Console.setOut(System.out)""")
92+
}
93+
94+
afterCellExecution { snippetInstance, result ->
95+
afterCellExecution(snippetInstance, result)
96+
}
97+
98+
onShutdown {
99+
onShutdown()
100+
}
101+
80102
// Render Dataset
81103
render<Dataset<*>> {
82104
HTML(it.toHtml())

jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/SparkIntegration.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package org.jetbrains.kotlinx.spark.api.jupyter
2121

2222

2323
import org.intellij.lang.annotations.Language
24+
import org.jetbrains.kotlinx.jupyter.api.FieldValue
2425
import org.jetbrains.kotlinx.jupyter.api.KotlinKernelHost
2526

2627
/**
@@ -68,4 +69,10 @@ internal class SparkIntegration : Integration() {
6869
val udf: UDFRegistration get() = spark.udf()""".trimIndent(),
6970
).map(::execute)
7071
}
72+
73+
override fun KotlinKernelHost.onShutdown() {
74+
execute("""spark.stop()""")
75+
}
76+
77+
override fun KotlinKernelHost.afterCellExecution(snippetInstance: Any, result: FieldValue) = Unit
7178
}

jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/SparkStreamingIntegration.kt

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,27 +19,10 @@
1919
*/
2020
package org.jetbrains.kotlinx.spark.api.jupyter
2121

22-
import kotlinx.html.*
23-
import kotlinx.html.stream.appendHTML
24-
import org.apache.spark.api.java.JavaRDDLike
25-
import org.apache.spark.rdd.RDD
26-
import org.apache.spark.sql.Dataset
27-
import org.apache.spark.unsafe.array.ByteArrayMethods
28-
import org.intellij.lang.annotations.Language
29-
import org.jetbrains.kotlinx.jupyter.api.HTML
30-
import org.jetbrains.kotlinx.jupyter.api.libraries.JupyterIntegration
31-
import org.jetbrains.kotlinx.spark.api.*
32-
import java.io.InputStreamReader
33-
3422

35-
import org.apache.spark.*
23+
import org.intellij.lang.annotations.Language
24+
import org.jetbrains.kotlinx.jupyter.api.FieldValue
3625
import org.jetbrains.kotlinx.jupyter.api.KotlinKernelHost
37-
import scala.collection.*
38-
import org.jetbrains.kotlinx.spark.api.SparkSession
39-
import scala.Product
40-
import java.io.Serializable
41-
import scala.collection.Iterable as ScalaIterable
42-
import scala.collection.Iterator as ScalaIterator
4326

4427
/**
4528
* %use spark-streaming
@@ -48,6 +31,11 @@ import scala.collection.Iterator as ScalaIterator
4831
@OptIn(ExperimentalStdlibApi::class)
4932
internal class SparkStreamingIntegration : Integration() {
5033

34+
override val imports: Array<String> = super.imports + arrayOf(
35+
"org.apache.spark.deploy.SparkHadoopUtil",
36+
"org.apache.hadoop.conf.Configuration",
37+
)
38+
5139
override fun KotlinKernelHost.onLoaded() {
5240
val _0 = execute("""%dumpClassesForSpark""")
5341

@@ -57,4 +45,8 @@ internal class SparkStreamingIntegration : Integration() {
5745
println("To start a spark streaming session, simply use `withSparkStreaming { }` inside a cell. To use Spark normally, use `withSpark { }` in a cell, or use `%use spark` to start a Spark session for the whole notebook.")""".trimIndent(),
5846
).map(::execute)
5947
}
48+
49+
override fun KotlinKernelHost.onShutdown() = Unit
50+
51+
override fun KotlinKernelHost.afterCellExecution(snippetInstance: Any, result: FieldValue) = Unit
6052
}

jupyter/src/test/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/JupyterTests.kt

Lines changed: 46 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,13 @@ package org.jetbrains.kotlinx.spark.api.jupyter
2121

2222
import io.kotest.assertions.throwables.shouldThrowAny
2323
import io.kotest.core.spec.style.ShouldSpec
24-
import io.kotest.matchers.collections.shouldBeIn
2524
import io.kotest.matchers.nulls.shouldNotBeNull
2625
import io.kotest.matchers.shouldBe
2726
import io.kotest.matchers.shouldNotBe
2827
import io.kotest.matchers.string.shouldContain
2928
import io.kotest.matchers.types.shouldBeInstanceOf
3029
import jupyter.kotlin.DependsOn
3130
import org.apache.spark.api.java.JavaSparkContext
32-
import org.apache.spark.streaming.Duration
3331
import org.intellij.lang.annotations.Language
3432
import org.jetbrains.kotlinx.jupyter.EvalRequestData
3533
import org.jetbrains.kotlinx.jupyter.ReplForJupyter
@@ -40,11 +38,8 @@ import org.jetbrains.kotlinx.jupyter.libraries.EmptyResolutionInfoProvider
4038
import org.jetbrains.kotlinx.jupyter.repl.EvalResultEx
4139
import org.jetbrains.kotlinx.jupyter.testkit.ReplProvider
4240
import org.jetbrains.kotlinx.jupyter.util.PatternNameAcceptanceRule
43-
import org.jetbrains.kotlinx.spark.api.tuples.*
44-
import org.jetbrains.kotlinx.spark.api.*
45-
import scala.Tuple2
41+
import org.jetbrains.kotlinx.spark.api.SparkSession
4642
import java.io.Serializable
47-
import java.util.*
4843
import kotlin.script.experimental.jvm.util.classpathFromClassloader
4944

5045
class JupyterTests : ShouldSpec({
@@ -155,16 +150,19 @@ class JupyterTests : ShouldSpec({
155150
should("render JavaRDDs with custom class") {
156151

157152
@Language("kts")
158-
val klass = exec("""
153+
val klass = exec(
154+
"""
159155
data class Test(
160156
val longFirstName: String,
161157
val second: LongArray,
162158
val somethingSpecial: Map<Int, String>,
163159
): Serializable
164-
""".trimIndent())
160+
""".trimIndent()
161+
)
165162

166163
@Language("kts")
167-
val html = execHtml("""
164+
val html = execHtml(
165+
"""
168166
val rdd = sc.parallelize(
169167
listOf(
170168
Test("aaaaaaaaa", longArrayOf(1L, 100000L, 24L), mapOf(1 to "one", 2 to "two")),
@@ -246,8 +244,10 @@ class JupyterStreamingTests : ShouldSpec({
246244
host = this,
247245
integrationTypeNameRules = listOf(
248246
PatternNameAcceptanceRule(false, "org.jetbrains.kotlinx.spark.api.jupyter.**"),
249-
PatternNameAcceptanceRule(true,
250-
"org.jetbrains.kotlinx.spark.api.jupyter.SparkStreamingIntegration"),
247+
PatternNameAcceptanceRule(
248+
true,
249+
"org.jetbrains.kotlinx.spark.api.jupyter.SparkStreamingIntegration"
250+
),
251251
),
252252
)
253253
}
@@ -279,29 +279,46 @@ class JupyterStreamingTests : ShouldSpec({
279279
}
280280
}
281281

282-
should("stream") {
283-
val input = listOf("aaa", "bbb", "aaa", "ccc")
284-
val counter = Counter(0)
285-
286-
withSparkStreaming(Duration(10), timeout = 1000) {
287-
288-
val (counterBroadcast, queue) = withSpark(ssc) {
289-
spark.broadcast(counter) X LinkedList(listOf(sc.parallelize(input)))
290-
}
282+
xshould("stream") {
291283

292-
val inputStream = ssc.queueStream(queue)
293-
294-
inputStream.foreachRDD { rdd, _ ->
295-
withSpark(rdd) {
296-
rdd.toDS().forEach {
297-
it shouldBeIn input
298-
counterBroadcast.value.value++
284+
@Language("kts")
285+
val value = exec(
286+
"""
287+
import java.util.LinkedList
288+
import org.apache.spark.api.java.function.ForeachFunction
289+
import org.apache.spark.util.LongAccumulator
290+
291+
292+
val input = arrayListOf("aaa", "bbb", "aaa", "ccc")
293+
294+
@Volatile
295+
var counter: LongAccumulator? = null
296+
297+
withSparkStreaming(Duration(10), timeout = 1_000) {
298+
299+
val queue = withSpark(ssc) {
300+
LinkedList(listOf(sc.parallelize(input)))
301+
}
302+
303+
val inputStream = ssc.queueStream(queue)
304+
305+
inputStream.foreachRDD { rdd, _ ->
306+
withSpark(rdd) {
307+
if (counter == null)
308+
counter = sc.sc().longAccumulator()
309+
310+
rdd.toDS().showDS().forEach {
311+
if (it !in input) error(it + " should be in input")
312+
counter!!.add(1L)
313+
}
299314
}
300315
}
301316
}
302-
}
317+
counter!!.sum()
318+
""".trimIndent()
319+
) as Long
303320

304-
counter.value shouldBe input.size
321+
value shouldBe 4L
305322
}
306323

307324
}

kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkSession.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ class KSparkStreamingSession(@Transient val ssc: JavaStreamingContext) : Seriali
113113
runAfterStart = block
114114
}
115115

116-
internal fun invokeRunAfterStart(): Unit = runAfterStart()
116+
fun invokeRunAfterStart(): Unit = runAfterStart()
117117

118118

119119
/** Creates new spark session from given [sc]. */

0 commit comments

Comments
 (0)