Skip to content

Commit c4a5721

Browse files
authored
Propagate transport coroutine context (#374)
1 parent 717797a commit c4a5721

File tree

5 files changed

+74
-8
lines changed

5 files changed

+74
-8
lines changed

krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ public abstract class KrpcServer(
6060
*/
6161

6262
@InternalRpcApi
63-
public val internalScope: CoroutineScope = CoroutineScope(SupervisorJob(transport.coroutineContext.job))
63+
public val internalScope: CoroutineScope = CoroutineScope(
64+
transport.coroutineContext + SupervisorJob(transport.coroutineContext.job)
65+
)
6466

6567
private val logger = RpcInternalCommonLogger.logger(rpcInternalObjectId())
6668

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.rpc.krpc.test
6+
7+
import kotlinx.coroutines.currentCoroutineContext
8+
import kotlinx.coroutines.test.runTest
9+
import kotlinx.coroutines.withContext
10+
import kotlinx.rpc.krpc.rpcClientConfig
11+
import kotlinx.rpc.krpc.rpcServerConfig
12+
import kotlinx.rpc.krpc.serialization.json.json
13+
import kotlinx.rpc.withService
14+
import kotlin.coroutines.CoroutineContext
15+
import kotlin.test.Test
16+
import kotlin.test.assertEquals
17+
18+
class CoroutineContextPropagationTest {
19+
private val rpcServerConfig = rpcServerConfig {
20+
serialization {
21+
json()
22+
}
23+
}
24+
private val rpcClientConfig = rpcClientConfig {
25+
serialization {
26+
json {
27+
ignoreUnknownKeys = true
28+
}
29+
}
30+
}
31+
32+
data class CoroutineElement(val value: String) : CoroutineContext.Element {
33+
object Key : CoroutineContext.Key<CoroutineElement>
34+
35+
override val key: CoroutineContext.Key<*> = Key
36+
}
37+
38+
@Test
39+
fun test() = runTest {
40+
var actualContext: CoroutineElement? = null
41+
val transport = LocalTransport(CoroutineElement("transport"))
42+
val server = KrpcTestServer(rpcServerConfig, transport.server)
43+
val client = KrpcTestClient(rpcClientConfig, transport.client)
44+
withContext(CoroutineElement("server")) {
45+
server.registerService(Echo::class) {
46+
object : Echo {
47+
override suspend fun echo(message: String): String = run {
48+
actualContext = currentCoroutineContext().get(CoroutineElement.Key)
49+
"response"
50+
}
51+
}
52+
}
53+
}
54+
withContext(CoroutineElement("client")) {
55+
client.withService(Echo::class).echo("request")
56+
}
57+
assertEquals(CoroutineElement("transport"), actualContext)
58+
}
59+
}

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/LocalTransport.kt

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ import kotlin.coroutines.CoroutineContext
1616
import kotlin.time.Clock
1717
import kotlin.time.ExperimentalTime
1818

19-
class LocalTransport(parentScope: CoroutineScope? = null) : CoroutineScope {
20-
override val coroutineContext = parentScope?.run { SupervisorJob(coroutineContext.job) }
21-
?: SupervisorJob()
19+
class LocalTransport(
20+
parentContext: CoroutineContext? = null,
21+
) : CoroutineScope {
22+
override val coroutineContext = SupervisorJob(parentContext?.get(Job))
2223

2324
private val clientIncoming = Channel<KrpcTransportMessage>()
2425
private val serverIncoming = Channel<KrpcTransportMessage>()
@@ -27,7 +28,9 @@ class LocalTransport(parentScope: CoroutineScope? = null) : CoroutineScope {
2728
val lastMessageSentOnServer = atomic(0L)
2829

2930
val client: KrpcTransport = object : KrpcTransport {
30-
override val coroutineContext: CoroutineContext = Job(this@LocalTransport.coroutineContext.job)
31+
override val coroutineContext: CoroutineContext = Job(this@LocalTransport.coroutineContext.job).let {
32+
if(parentContext != null) parentContext + it else it
33+
}
3134

3235
@OptIn(ExperimentalTime::class)
3336
override suspend fun send(message: KrpcTransportMessage) {
@@ -41,7 +44,9 @@ class LocalTransport(parentScope: CoroutineScope? = null) : CoroutineScope {
4144
}
4245

4346
val server: KrpcTransport = object : KrpcTransport {
44-
override val coroutineContext: CoroutineContext = Job(this@LocalTransport.coroutineContext)
47+
override val coroutineContext: CoroutineContext = Job(this@LocalTransport.coroutineContext.job).let {
48+
if(parentContext != null) parentContext + it else it
49+
}
4550

4651
@OptIn(ExperimentalTime::class)
4752
override suspend fun send(message: KrpcTransportMessage) {

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationToolkit.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class CancellationToolkit(scope: CoroutineScope) : CoroutineScope by scope {
5050
}
5151
}
5252

53-
val transport = LocalTransport(this)
53+
val transport = LocalTransport(this.coroutineContext.job)
5454

5555
val client by lazy {
5656
KrpcTestClient(rpcClientConfig {

krpc/krpc-test/src/jvmTest/kotlin/kotlinx/rpc/krpc/test/api/WireSamplingTestScope.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ class WireSamplingTestScope(private val sampleName: String, scope: TestScope) :
229229
}
230230

231231
private class WireToolkit(scope: CoroutineScope, format: SamplingFormat, val logger: RpcInternalCommonLogger? = null) {
232-
val transport = LocalTransport(scope)
232+
val transport = LocalTransport(scope.coroutineContext.job)
233233

234234
val client by lazy {
235235
KrpcTestClient(rpcClientConfig {

0 commit comments

Comments
 (0)