Skip to content

Commit ab84990

Browse files
authored
Truncate and limit variables and onInterrupt stream cleanup for jupyter notebooks (1.1.0) (#158)
* added truncate and limit variables for jupyter notebooks * added support for onInterrupt in spark streaming
1 parent 3092d4b commit ab84990

File tree

5 files changed

+167
-16
lines changed

5 files changed

+167
-16
lines changed

jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/Integration.kt

Lines changed: 62 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,9 @@ package org.jetbrains.kotlinx.spark.api.jupyter
2222
import org.apache.spark.api.java.JavaRDDLike
2323
import org.apache.spark.rdd.RDD
2424
import org.apache.spark.sql.Dataset
25-
import org.jetbrains.kotlinx.jupyter.api.FieldValue
26-
import org.jetbrains.kotlinx.jupyter.api.HTML
27-
import org.jetbrains.kotlinx.jupyter.api.KotlinKernelHost
25+
import org.jetbrains.kotlinx.jupyter.api.*
2826
import org.jetbrains.kotlinx.jupyter.api.libraries.JupyterIntegration
27+
import kotlin.reflect.typeOf
2928

3029
abstract class Integration : JupyterIntegration() {
3130

@@ -34,14 +33,25 @@ abstract class Integration : JupyterIntegration() {
3433
private val scalaVersion = "2.12.15"
3534
private val spark3Version = "3.2.1"
3635

36+
private val displayLimit = "DISPLAY_LIMIT"
37+
private val displayLimitDefault = 20
38+
private val displayTruncate = "DISPLAY_TRUNCATE"
39+
private val displayTruncateDefault = 30
40+
3741
/**
3842
* Will be run after importing all dependencies
3943
*/
40-
abstract fun KotlinKernelHost.onLoaded()
44+
open fun KotlinKernelHost.onLoaded() = Unit
45+
46+
open fun KotlinKernelHost.onShutdown() = Unit
47+
48+
open fun KotlinKernelHost.onInterrupt() = Unit
49+
50+
open fun KotlinKernelHost.beforeCellExecution() = Unit
4151

42-
abstract fun KotlinKernelHost.onShutdown()
52+
open fun KotlinKernelHost.afterCellExecution(snippetInstance: Any, result: FieldValue) = Unit
4353

44-
abstract fun KotlinKernelHost.afterCellExecution(snippetInstance: Any, result: FieldValue)
54+
open fun Builder.onLoadedAlsoDo() = Unit
4555

4656
open val dependencies: Array<String> = arrayOf(
4757
"org.apache.spark:spark-repl_$scalaCompatVersion:$spark3Version",
@@ -84,32 +94,75 @@ abstract class Integration : JupyterIntegration() {
8494
import(*imports)
8595

8696
onLoaded {
97+
declare(
98+
VariableDeclaration(
99+
name = displayLimit,
100+
value = displayLimitDefault,
101+
type = typeOf<Int>(),
102+
isMutable = true,
103+
),
104+
VariableDeclaration(
105+
name = displayTruncate,
106+
value = displayTruncateDefault,
107+
type = typeOf<Int>(),
108+
isMutable = true,
109+
),
110+
)
111+
87112
onLoaded()
88113
}
89114

90115
beforeCellExecution {
91116
execute("""scala.Console.setOut(System.out)""")
117+
118+
beforeCellExecution()
92119
}
93120

94121
afterCellExecution { snippetInstance, result ->
95122
afterCellExecution(snippetInstance, result)
96123
}
97124

125+
onInterrupt {
126+
onInterrupt()
127+
}
128+
98129
onShutdown {
99130
onShutdown()
100131
}
101132

133+
fun getLimitAndTruncate() = Pair(
134+
notebook
135+
.variablesState[displayLimit]
136+
?.value
137+
?.getOrNull() as? Int
138+
?: displayLimitDefault,
139+
notebook
140+
.variablesState[displayTruncate]
141+
?.value
142+
?.getOrNull() as? Int
143+
?: displayTruncateDefault
144+
)
145+
146+
102147
// Render Dataset
103148
render<Dataset<*>> {
104-
HTML(it.toHtml())
149+
val (limit, truncate) = getLimitAndTruncate()
150+
151+
HTML(it.toHtml(limit = limit, truncate = truncate))
105152
}
106153

107154
render<RDD<*>> {
108-
HTML(it.toJavaRDD().toHtml())
155+
val (limit, truncate) = getLimitAndTruncate()
156+
157+
HTML(it.toJavaRDD().toHtml(limit = limit, truncate = truncate))
109158
}
110159

111160
render<JavaRDDLike<*, *>> {
112-
HTML(it.toHtml())
161+
val (limit, truncate) = getLimitAndTruncate()
162+
163+
HTML(it.toHtml(limit = limit, truncate = truncate))
113164
}
165+
166+
onLoadedAlsoDo()
114167
}
115168
}

jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/SparkIntegration.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,4 @@ internal class SparkIntegration : Integration() {
7373
override fun KotlinKernelHost.onShutdown() {
7474
execute("""spark.stop()""")
7575
}
76-
77-
override fun KotlinKernelHost.afterCellExecution(snippetInstance: Any, result: FieldValue) = Unit
7876
}

jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/SparkStreamingIntegration.kt

Lines changed: 95 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,33 +20,124 @@
2020
package org.jetbrains.kotlinx.spark.api.jupyter
2121

2222

23+
import org.apache.spark.streaming.StreamingContextState
24+
import org.apache.spark.streaming.api.java.JavaStreamingContext
2325
import org.intellij.lang.annotations.Language
24-
import org.jetbrains.kotlinx.jupyter.api.FieldValue
2526
import org.jetbrains.kotlinx.jupyter.api.KotlinKernelHost
27+
import org.jetbrains.kotlinx.jupyter.api.VariableDeclaration
28+
import org.jetbrains.kotlinx.jupyter.api.declare
29+
import kotlin.reflect.typeOf
2630

2731
/**
2832
* %use spark-streaming
2933
*/
3034
@Suppress("UNUSED_VARIABLE", "LocalVariableName")
31-
@OptIn(ExperimentalStdlibApi::class)
3235
internal class SparkStreamingIntegration : Integration() {
3336

3437
override val imports: Array<String> = super.imports + arrayOf(
3538
"org.apache.spark.deploy.SparkHadoopUtil",
3639
"org.apache.hadoop.conf.Configuration",
3740
)
3841

42+
private val sscCollection = mutableSetOf<JavaStreamingContext>()
43+
3944
override fun KotlinKernelHost.onLoaded() {
45+
46+
declare(
47+
VariableDeclaration(
48+
name = ::sscCollection.name,
49+
value = sscCollection,
50+
isMutable = false,
51+
type = typeOf<MutableSet<JavaStreamingContext>>(),
52+
)
53+
)
54+
4055
val _0 = execute("""%dumpClassesForSpark""")
4156

4257
@Language("kts")
4358
val _1 = listOf(
59+
"""
60+
@JvmOverloads
61+
fun withSparkStreaming(
62+
batchDuration: Duration = Durations.seconds(1L),
63+
checkpointPath: String? = null,
64+
hadoopConf: Configuration = SparkHadoopUtil.get().conf(),
65+
createOnError: Boolean = false,
66+
props: Map<String, Any> = emptyMap(),
67+
master: String = SparkConf().get("spark.master", "local[*]"),
68+
appName: String = "Kotlin Spark Sample",
69+
timeout: Long = -1L,
70+
startStreamingContext: Boolean = true,
71+
func: KSparkStreamingSession.() -> Unit,
72+
) {
73+
74+
// will only be set when a new context is created
75+
var kSparkStreamingSession: KSparkStreamingSession? = null
76+
77+
val creatingFunc = {
78+
val sc = SparkConf()
79+
.setAppName(appName)
80+
.setMaster(master)
81+
.setAll(
82+
props
83+
.map { (key, value) -> key X value.toString() }
84+
.asScalaIterable()
85+
)
86+
87+
val ssc = JavaStreamingContext(sc, batchDuration)
88+
ssc.checkpoint(checkpointPath)
89+
90+
kSparkStreamingSession = KSparkStreamingSession(ssc)
91+
func(kSparkStreamingSession!!)
92+
93+
ssc
94+
}
95+
96+
val ssc = when {
97+
checkpointPath != null ->
98+
JavaStreamingContext.getOrCreate(checkpointPath, creatingFunc, hadoopConf, createOnError)
99+
100+
else -> creatingFunc()
101+
}
102+
sscCollection += ssc
103+
104+
if (startStreamingContext) {
105+
ssc.start()
106+
kSparkStreamingSession?.invokeRunAfterStart()
107+
}
108+
ssc.awaitTerminationOrTimeout(timeout)
109+
ssc.stop()
110+
}
111+
""".trimIndent(),
44112
"""
45113
println("To start a spark streaming session, simply use `withSparkStreaming { }` inside a cell. To use Spark normally, use `withSpark { }` in a cell, or use `%use spark` to start a Spark session for the whole notebook.")""".trimIndent(),
46114
).map(::execute)
47115
}
48116

49-
override fun KotlinKernelHost.onShutdown() = Unit
117+
private fun cleanUp(e: Throwable): String {
118+
while (sscCollection.isNotEmpty())
119+
sscCollection.first().let {
120+
while (it.state != StreamingContextState.STOPPED) {
121+
try {
122+
it.stop(true, true)
123+
} catch (_: Exception) {
124+
}
125+
}
126+
sscCollection.remove(it)
127+
}
128+
129+
return "Spark streams cleaned up. Cause: $e"
130+
}
131+
132+
override fun Builder.onLoadedAlsoDo() {
133+
renderThrowable<IllegalMonitorStateException> {
134+
cleanUp(it)
135+
}
136+
}
50137

51-
override fun KotlinKernelHost.afterCellExecution(snippetInstance: Any, result: FieldValue) = Unit
138+
override fun KotlinKernelHost.onInterrupt() {
139+
println(
140+
cleanUp(InterruptedException("Kernel was interrupted."))
141+
)
142+
}
52143
}

jupyter/src/test/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/JupyterTests.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import io.kotest.matchers.string.shouldContain
2828
import io.kotest.matchers.types.shouldBeInstanceOf
2929
import jupyter.kotlin.DependsOn
3030
import org.apache.spark.api.java.JavaSparkContext
31+
import org.apache.spark.streaming.api.java.JavaStreamingContext
3132
import org.intellij.lang.annotations.Language
3233
import org.jetbrains.kotlinx.jupyter.EvalRequestData
3334
import org.jetbrains.kotlinx.jupyter.ReplForJupyter
@@ -263,6 +264,14 @@ class JupyterStreamingTests : ShouldSpec({
263264
context("Jupyter") {
264265
withRepl {
265266

267+
// For when onInterrupt is implemented in the Jupyter kernel
268+
should("Have sscCollection instance") {
269+
270+
@Language("kts")
271+
val sscCollection = exec("""sscCollection""")
272+
sscCollection as? MutableSet<JavaStreamingContext> shouldNotBe null
273+
}
274+
266275
should("Not have spark instance") {
267276
shouldThrowAny {
268277
@Language("kts")

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
<kotest-extensions-allure.version>1.1.0</kotest-extensions-allure.version>
1818
<kotest-extensions-testcontainers.version>1.3.1</kotest-extensions-testcontainers.version>
1919
<kotest.version>5.2.3</kotest.version>
20-
<kotlin-jupyter-api.version>0.11.0-83</kotlin-jupyter-api.version>
20+
<kotlin-jupyter-api.version>0.11.0-95</kotlin-jupyter-api.version>
2121
<kotlin.version>1.6.21</kotlin.version>
2222
<kotlinx.html.version>0.7.5</kotlinx.html.version>
2323
<spark3.version>3.2.1</spark3.version>

0 commit comments

Comments
 (0)