Skip to content

Commit b01ceb3

Browse files
authored
Fix bidirectional flows in non-suspend streams (#316)
1 parent 34638f4 commit b01ceb3

File tree

4 files changed

+55
-13
lines changed

4 files changed

+55
-13
lines changed

krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ public abstract class KrpcClient(
311311
connector.sendMessage(firstMessage)
312312
}
313313

314-
private val nonSuspendingSerialFormat = config.serialFormatInitializer.build()
314+
private val noFlowSerialFormat = config.serialFormatInitializer.build()
315315

316316
@Suppress("detekt.CyclomaticComplexMethod")
317317
override fun <T> callServerStreaming(call: RpcCall): Flow<T> {
@@ -326,21 +326,33 @@ public abstract class KrpcClient(
326326

327327
val channel = Channel<T>()
328328

329-
val request = serializeRequest(
330-
callId = callId,
331-
call = call,
332-
callable = callable,
333-
serialFormat = nonSuspendingSerialFormat,
334-
pluginParams = mapOf(KrpcPluginKey.NON_SUSPENDING_SERVER_FLOW_MARKER to ""),
335-
)
336-
337-
connector.sendMessage(request)
329+
val streamScope = StreamScope(currentCoroutineContext())
338330

339331
try {
332+
val streamContext = LazyKrpcStreamContext(streamScope, null) {
333+
KrpcStreamContext(callId, config, connectionId, call.serviceId, it)
334+
}
335+
336+
val serialFormat = prepareSerialFormat(streamContext)
337+
338+
val request = serializeRequest(
339+
callId = callId,
340+
call = call,
341+
callable = callable,
342+
serialFormat = serialFormat,
343+
pluginParams = mapOf(KrpcPluginKey.NON_SUSPENDING_SERVER_FLOW_MARKER to ""),
344+
)
345+
346+
connector.sendMessage(request)
347+
340348
connector.subscribeToCallResponse(call.descriptor.fqName, callId) { message ->
341349
handleServerStreamingMessage(message, channel, callable, call, callId)
342350
}
343351

352+
streamContext.valueOrNull?.launchIf({ outgoingStreamsAvailable }) {
353+
handleOutgoingStreams(it, serialFormat, call.descriptor.fqName)
354+
}
355+
344356
while (true) {
345357
val element = channel.receiveCatching()
346358
if (element.isClosed) {
@@ -358,11 +370,14 @@ public abstract class KrpcClient(
358370
connector.unsubscribeFromMessages(call.descriptor.fqName, callId)
359371

360372
throw e
373+
} finally {
374+
streamScope.close()
375+
channel.close()
361376
}
362377
}
363378
}
364379

365-
private suspend fun <T, @Rpc R: Any> handleServerStreamingMessage(
380+
private suspend fun <T, @Rpc R : Any> handleServerStreamingMessage(
366381
message: KrpcCallMessage,
367382
channel: Channel<T>,
368383
callable: RpcCallable<R>,
@@ -390,10 +405,10 @@ public abstract class KrpcClient(
390405

391406
is KrpcCallMessage.CallSuccess, is KrpcCallMessage.StreamMessage -> {
392407
val value = runCatching {
393-
val serializerResult = nonSuspendingSerialFormat.serializersModule
408+
val serializerResult = noFlowSerialFormat.serializersModule
394409
.rpcSerializerForType(callable.returnType)
395410

396-
decodeMessageData(nonSuspendingSerialFormat, serializerResult, message)
411+
decodeMessageData(noFlowSerialFormat, serializerResult, message)
397412
}
398413

399414
@Suppress("UNCHECKED_CAST")

krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestService.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ interface KrpcTestService : RemoteService {
4949
fun nonSuspendFlow(): Flow<Int>
5050
fun nonSuspendFlowErrorOnEmit(): Flow<Int>
5151
fun nonSuspendFlowErrorOnReturn(): Flow<Int>
52+
fun nonSuspendBidirectional(flow: Flow<Int>): Flow<Int>
53+
fun nonSuspendBidirectionalPayload(payloadWithStream: PayloadWithStream): Flow<Int>
54+
5255
suspend fun empty()
5356
suspend fun returnType(): String
5457
suspend fun simpleWithParams(name: String): String

krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestServiceBackend.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,14 @@ class KrpcTestServiceBackend(override val coroutineContext: CoroutineContext) :
3737
error("nonSuspendFlowErrorOnReturn")
3838
}
3939

40+
override fun nonSuspendBidirectional(flow: Flow<Int>): Flow<Int> {
41+
return flow.map { it * 2 }
42+
}
43+
44+
override fun nonSuspendBidirectionalPayload(payloadWithStream: PayloadWithStream): Flow<Int> {
45+
return payloadWithStream.stream.map { it.length }
46+
}
47+
4048
@Suppress("detekt.EmptyFunctionBlock")
4149
override suspend fun empty() {}
4250

krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.kt

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,22 @@ abstract class KrpcTransportTestBase {
136136
}
137137
}
138138

139+
@Test
140+
fun nonSuspendBidirectional() = runTest {
141+
assertEquals(
142+
expected = List(10) { it * 2 },
143+
actual = client.nonSuspendBidirectional(List(10) { it }.asFlow()).toList(),
144+
)
145+
print(1)
146+
}
147+
148+
@Test
149+
fun nonSuspendBidirectionalPayload() = runTest {
150+
assertEquals(
151+
expected = List(3) { 2 },
152+
actual = client.nonSuspendBidirectionalPayload(payload(0)).toList(),
153+
)
154+
}
139155

140156
@Test
141157
fun empty() {

0 commit comments

Comments
 (0)