Skip to content

Commit a4444e0

Browse files
Neuman968jf9327
andauthored
Updated SSE Example to utilize SharedFlow rather than Deprecated BroadcastChannels (#172)
* Updated Sse example application to replaced deprecated Coroutines api with Shared Flow. * Changed responseTextWriter to respondBytesWriter. Updated write method uses to writeStringUtf8 to avoid blocking in Java applications. --------- Co-authored-by: johnflynn <johnflynn493@gmail.com>
1 parent e8f8b15 commit a4444e0

File tree

1 file changed

+21
-23
lines changed

1 file changed

+21
-23
lines changed

sse/src/main/kotlin/io/ktor/samples/sse/SseApplication.kt

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,17 @@ import io.ktor.server.engine.*
77
import io.ktor.server.netty.*
88
import io.ktor.server.response.*
99
import io.ktor.server.routing.*
10+
import io.ktor.util.cio.*
11+
import io.ktor.utils.io.*
1012
import kotlinx.coroutines.*
11-
import kotlinx.coroutines.channels.*
13+
import kotlinx.coroutines.flow.*
14+
import kotlin.time.Duration.Companion.seconds
1215

1316
/**
1417
* An SSE (Server-Sent Events) sample application.
1518
* This is the main entrypoint of the application.
1619
*/
17-
@OptIn(ExperimentalCoroutinesApi::class, ObsoleteCoroutinesApi::class)
20+
@OptIn(ExperimentalCoroutinesApi::class)
1821
fun main() {
1922
/**
2023
* Here we create and start a Netty embedded server listening to the port 8080
@@ -29,43 +32,43 @@ fun main() {
2932
data class SseEvent(val data: String, val event: String? = null, val id: String? = null)
3033

3134
/**
32-
* Method that responds an [ApplicationCall] by reading all the [SseEvent]s from the specified [events] [ReceiveChannel]
35+
* Method that responds an [ApplicationCall] by reading all the [SseEvent]s from the specified [eventFlow] [Flow]
3336
* and serializing them in a way that is compatible with the Server-Sent Events specification.
3437
*
3538
* You can read more about it here: https://www.html5rocks.com/en/tutorials/eventsource/basics/
3639
*/
37-
suspend fun ApplicationCall.respondSse(events: ReceiveChannel<SseEvent>) {
40+
suspend fun ApplicationCall.respondSse(eventFlow: Flow<SseEvent>) {
3841
response.cacheControl(CacheControl.NoCache(null))
39-
respondTextWriter(contentType = ContentType.Text.EventStream) {
40-
for (event in events) {
42+
respondBytesWriter(contentType = ContentType.Text.EventStream) {
43+
eventFlow.collect { event ->
4144
if (event.id != null) {
42-
write("id: ${event.id}\n")
45+
writeStringUtf8("id: ${event.id}\n")
4346
}
4447
if (event.event != null) {
45-
write("event: ${event.event}\n")
48+
writeStringUtf8("event: ${event.event}\n")
4649
}
4750
for (dataLine in event.data.lines()) {
48-
write("data: $dataLine\n")
51+
writeStringUtf8("data: $dataLine\n")
4952
}
50-
write("\n")
53+
writeStringUtf8("\n")
5154
flush()
5255
}
5356
}
5457
}
5558

5659
fun Application.module() {
5760
/**
58-
* We produce a [BroadcastChannel] from a suspending function
59-
* that send a [SseEvent] instance each second.
61+
* We produce a [SharedFlow] from a function
62+
* that sends an [SseEvent] instance each second.
6063
*/
61-
val channel = produce { // this: ProducerScope<SseEvent> ->
64+
val sseFlow = flow {
6265
var n = 0
6366
while (true) {
64-
send(SseEvent("demo$n"))
65-
delay(1000)
67+
emit(SseEvent("demo$n"))
68+
delay(1.seconds)
6669
n++
6770
}
68-
}.broadcast()
71+
}.shareIn(GlobalScope, SharingStarted.Eagerly)
6972

7073
/**
7174
* We use the [Routing] plugin to declare [Route] that will be
@@ -75,15 +78,10 @@ fun Application.module() {
7578
/**
7679
* Route to be executed when the client perform a GET `/sse` request.
7780
* It will respond using the [respondSse] extension method defined in this same file
78-
* that uses the [BroadcastChannel] channel we created earlier to emit those events.
81+
* that uses the [SharedFlow] to collect sse events.
7982
*/
8083
get("/sse") {
81-
val events = channel.openSubscription()
82-
try {
83-
call.respondSse(events)
84-
} finally {
85-
events.cancel()
86-
}
84+
call.respondSse(sseFlow)
8785
}
8886
/**
8987
* Route to be executed when the client perform a GET `/` request.

0 commit comments

Comments
 (0)