Skip to content

Commit 89ad188

Browse files
authored
Drop or rename TODOs and use logging for unexpected frame (#295)
1 parent cf292b6 commit 89ad188

File tree

27 files changed

+60
-64
lines changed

27 files changed

+60
-64
lines changed

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/ConnectionInbound.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ internal class ConnectionInbound(
5454
}
5555

5656
private fun receiveError(cause: Throwable) {
57-
throw cause // TODO?
57+
throw cause
5858
}
5959

6060
fun createOperation(type: FrameType, requestJob: Job): ResponderOperation = when (type) {

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/ConnectionInitializer.kt

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,16 @@ import io.rsocket.kotlin.*
2020
import io.rsocket.kotlin.core.*
2121
import io.rsocket.kotlin.frame.*
2222
import io.rsocket.kotlin.internal.io.*
23+
import io.rsocket.kotlin.logging.*
2324
import io.rsocket.kotlin.transport.*
2425
import kotlinx.coroutines.*
2526

27+
@RSocketLoggingApi
2628
@RSocketTransportApi
2729
internal abstract class ConnectionInitializer(
2830
private val isClient: Boolean,
2931
private val frameCodec: FrameCodec,
32+
private val frameLogger: Logger,
3033
private val connectionAcceptor: ConnectionAcceptor,
3134
private val interceptors: Interceptors,
3235
) {
@@ -42,11 +45,11 @@ internal abstract class ConnectionInitializer(
4245
else -> connection.acceptStream() ?: error("Initial stream should be received")
4346
}
4447
initialStream.setSendPriority(0)
45-
MultiplexedConnection(isClient, frameCodec, connection, initialStream, requestsScope)
48+
MultiplexedConnection(isClient, frameCodec, frameLogger, connection, initialStream, requestsScope)
4649
}
4750

4851
is RSocketSequentialConnection -> {
49-
SequentialConnection(isClient, frameCodec, connection, requestsScope)
52+
SequentialConnection(isClient, frameCodec, frameLogger, connection, requestsScope)
5053
}
5154
}
5255

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/MultiplexedConnection.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,22 @@
1616

1717
package io.rsocket.kotlin.connection
1818

19+
import io.rsocket.kotlin.*
1920
import io.rsocket.kotlin.frame.*
2021
import io.rsocket.kotlin.internal.*
22+
import io.rsocket.kotlin.logging.*
2123
import io.rsocket.kotlin.operation.*
2224
import io.rsocket.kotlin.payload.*
2325
import io.rsocket.kotlin.transport.*
2426
import kotlinx.coroutines.*
2527
import kotlinx.io.*
2628

29+
@RSocketLoggingApi
2730
@RSocketTransportApi
2831
internal class MultiplexedConnection(
2932
isClient: Boolean,
3033
frameCodec: FrameCodec,
34+
private val frameLogger: Logger,
3135
private val connection: RSocketMultiplexedConnection,
3236
private val initialStream: RSocketMultiplexedConnection.Stream,
3337
private val requestsScope: CoroutineScope,
@@ -146,7 +150,7 @@ internal class MultiplexedConnection(
146150
// request is cancelled during fragmentation
147151
is CancelFrame -> error("Request was cancelled by remote party")
148152
is RequestFrame -> {
149-
// TODO: extract assembly logic?
153+
// TODO[fragmentation]: extract assembly logic?
150154
when {
151155
// for RC, it could contain the complete flag
152156
// complete+follows=complete, "complete" overrides "follows" flag
@@ -194,7 +198,7 @@ internal class MultiplexedConnection(
194198
): Unit = coroutineScope {
195199
val outbound = Outbound(streamId, stream)
196200
val receiveJob = launch {
197-
val handler = OperationFrameHandler(operation)
201+
val handler = OperationFrameHandler(operation, frameLogger)
198202
try {
199203
while (true) {
200204
val frame = frameCodec.decodeFrame(

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/SequentialConnection.kt

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,21 @@
1616

1717
package io.rsocket.kotlin.connection
1818

19+
import io.rsocket.kotlin.*
1920
import io.rsocket.kotlin.frame.*
21+
import io.rsocket.kotlin.logging.*
2022
import io.rsocket.kotlin.operation.*
2123
import io.rsocket.kotlin.payload.*
2224
import io.rsocket.kotlin.transport.*
2325
import kotlinx.coroutines.*
2426
import kotlinx.io.*
2527

28+
@RSocketLoggingApi
2629
@RSocketTransportApi
2730
internal class SequentialConnection(
2831
isClient: Boolean,
2932
frameCodec: FrameCodec,
33+
private val frameLogger: Logger,
3034
private val connection: RSocketSequentialConnection,
3135
private val requestsScope: CoroutineScope,
3236
) : ConnectionOutbound(frameCodec) {
@@ -60,7 +64,7 @@ internal class SequentialConnection(
6064
): Job = requestsScope.launch(start = CoroutineStart.ATOMIC) {
6165
operation.handleExecutionFailure(requestPayload) {
6266
ensureActive() // because of atomic start
63-
val streamId = storage.createStream(OperationFrameHandler(operation))
67+
val streamId = storage.createStream(OperationFrameHandler(operation, frameLogger))
6468
try {
6569
operation.execute(Outbound(streamId), requestPayload)
6670
} finally {
@@ -116,7 +120,8 @@ internal class SequentialConnection(
116120
when {
117121
frame.follows -> ResponderInboundWrapper(connectionInbound, operationData)
118122
else -> acceptRequest(connectionInbound, operationData)
119-
}
123+
},
124+
frameLogger
120125
)
121126
if (storage.acceptStream(streamId, handler)) {
122127
// for fragmentation
@@ -159,7 +164,7 @@ internal class SequentialConnection(
159164
)
160165
)
161166
// close old handler
162-
storage.replaceStream(operationData.streamId, OperationFrameHandler(operation))?.close()
167+
storage.replaceStream(operationData.streamId, OperationFrameHandler(operation, frameLogger))?.close()
163168
} else {
164169
// should not happen really
165170
storage.removeStream(operationData.streamId)?.close()

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public class RSocketConnector internal constructor(
6666
private inner class SetupConnection() : ConnectionInitializer(
6767
isClient = true,
6868
frameCodec = FrameCodec(maxFragmentSize),
69+
frameLogger = frameLogger,
6970
connectionAcceptor = acceptor,
7071
interceptors = interceptors
7172
) {

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public class RSocketServer internal constructor(
6666
private inner class AcceptConnection(acceptor: ConnectionAcceptor) : ConnectionInitializer(
6767
isClient = false,
6868
frameCodec = FrameCodec(maxFragmentSize),
69+
frameLogger = frameLogger,
6970
connectionAcceptor = acceptor,
7071
interceptors = interceptors
7172
) {

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/Frame.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,7 +41,8 @@ internal sealed class Frame : AutoCloseable {
4141
}
4242

4343
internal fun dump(length: Long): String = buildString {
44-
append("\n").append(type).append(" frame -> Stream Id: ").append(streamId).append(" Length: ").append(length)
44+
append("\n").append(type).append(" frame -> Stream Id: ").append(streamId)
45+
if (length != -1L) append(" Length: ").append(length)
4546
append("\nFlags: 0b").append(flags.toBinaryString()).append(" (").apply { appendFlags() }.append(")")
4647
appendSelf()
4748
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/FrameCodec.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -32,5 +32,5 @@ internal class FrameCodec(
3232

3333
fun encodeFrame(frame: Frame): Buffer = frame.toBuffer()
3434

35-
// TODO: move fragmentation logic here or into separate class?
35+
// TODO[fragmentation]: move fragmentation logic here or into separate class?
3636
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestFrame.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -78,7 +78,7 @@ internal fun Source.readRequest(
7878
return RequestFrame(type, streamId, fragmentFollows, complete, next, initialRequest, payload)
7979
}
8080

81-
//TODO rename or remove on fragmentation implementation
81+
//TODO[fragmentation] rename or remove on fragmentation implementation
8282
internal fun RequestFireAndForgetFrame(streamId: Int, payload: Payload): RequestFrame =
8383
RequestFrame(FrameType.RequestFnF, streamId, false, false, false, 0, payload)
8484

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/SetupFrame.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,7 +25,7 @@ private const val HonorLeaseFlag = 64
2525
private const val ResumeEnabledFlag = 128
2626

2727
internal class SetupFrame(
28-
val version: Version, //TODO check
28+
val version: Version,
2929
val honorLease: Boolean,
3030
val keepAlive: KeepAlive,
3131
val resumeToken: Buffer?,
@@ -104,7 +104,7 @@ private fun Source.readStringMimeType(): String {
104104
}
105105

106106
private fun Sink.writeStringMimeType(mimeType: String) {
107-
val bytes = mimeType.encodeToByteArray() //TODO check
107+
val bytes = mimeType.encodeToByteArray()
108108
writeByte(bytes.size.toByte())
109109
write(bytes)
110110
}

0 commit comments

Comments
 (0)