You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
NettyFutureServer is used. We expose a public POST endpoint accepting a payload from the mobile clients. Due to the nature of mobile clients, sometimes the connection is closed in the middle of the request body. We observe the following error from time to time to time.
LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
#1:
Hint: 'HttpStreamsServerHandler#0-body-publisher' will handle the message from this point.
io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:86)
io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:25)
io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:115)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
org.playframework.netty.http.HttpStreamsHandler.handleReadHttpContent(HttpStreamsHandler.java:194)
org.playframework.netty.http.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:165)
org.playframework.netty.http.HttpStreamsServerHandler.channelRead(HttpStreamsServerHandler.java:96)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:107)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:359)
io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
io.opentelemetry.javaagent.shaded.instrumentation.netty.v4_1.internal.server.HttpServerRequestTracingHandler.channelRead(HttpServerRequestTracingHandler.java:44)
io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1429)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:793)
io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.handle(AbstractEpollChannel.java:471)
io.netty.channel.epoll.EpollIoHandler$DefaultEpollIoRegistration.handle(EpollIoHandler.java:307)
io.netty.channel.epoll.EpollIoHandler.processReady(EpollIoHandler.java:489)
io.netty.channel.epoll.EpollIoHandler.run(EpollIoHandler.java:444)
io.netty.channel.SingleThreadIoEventLoop.runIo(SingleThreadIoEventLoop.java:204)
io.netty.channel.SingleThreadIoEventLoop.run(SingleThreadIoEventLoop.java:175)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:1073)
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.base/java.lang.Thread.run(Unknown Source)
What is the problem?
SimpleSubscriber checks the received bytebuffer size. If the body is fully received, it releases the bytebuffer. Otherwise, it stores a link to the bytebuffer to be merged with the upcoming chunks. Probbaly, the Future returned from the SimpleSubscriber is garbage collected before receiving the close signal, and in this case byte buffer is collected without release.
How to reproduce?
Maybe you can provide code to reproduce the problem?
here's the server
import io.circe.derivation.Configuration
import io.circe.derivation.ConfiguredCodec
import sttp.model.StatusCode
import sttp.tapir.*
import sttp.tapir.Schema
import sttp.tapir.json.circe.jsonBody
import sttp.tapir.server.netty.NettyFutureServer
import sttp.tapir.server.netty.NettyFutureServerOptions
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
given Configuration = Configuration.default.withSnakeCaseMemberNames.withDefaults
case class PermissionsInput(
permissions: Seq[PermissionInput]
) derives ConfiguredCodec, Schema
case class PermissionInput(
platform: String,
id: String,
accessType: String,
state: String
) derives ConfiguredCodec, Schema
object Server {
private val permissionsEndpoint =
endpoint.put
.securityIn(header[Option[String]]("X-Token-1"))
.in("permissions")
.in(jsonBody[PermissionsInput])
.out(statusCode)
private val permissionsServerEndpoint = permissionsEndpoint
.serverSecurityLogic {
case Some(value) => Future.successful(Right(value))
case None => Future.successful(Left(StatusCode.Unauthorized))
}
.serverLogicSuccess { (token: String) => (permissionsInput: PermissionsInput) =>
Thread.sleep(100)
println(permissionsInput.permissions.size)
Future.successful(StatusCode.Created)
}
private val server = NettyFutureServer()
.host("0.0.0.0")
.options(
NettyFutureServerOptions.customiseInterceptors.serverLog(
NettyFutureServerOptions.defaultServerLog.logAllDecodeFailures(true).logWhenHandled(true)
).options
)
.port(8080)
.addEndpoints(permissionsServerEndpoint :: Nil)
def main(args: Array[String]): Unit = {
server.start().andThen {
case scala.util.Success(_) =>
println("Server started on port 8080")
case scala.util.Failure(exception) =>
println(s"Failed to start server: ${exception.getMessage}")
}
}
}
and here's the client sending truncated requests
import java.io.PrintWriter
import java.net.Socket
import java.util.concurrent.Executors
object TruncatedRequestsSender extends App {
private val pool = Executors.newFixedThreadPool(4)
private val client: Runnable = { () =>
var i = 0
while (true) {
println(s"${Thread.currentThread().getName}:$i")
val socket = new Socket("localhost", 8080)
val out = new PrintWriter(socket.getOutputStream, true)
out.print(
"""PUT /permissions HTTP/1.1
|Host: example.com
|Content-Type: application/json
|Content-Length: 500
|
|{
| "permissions": [
| {
| "id": "sleep",""".stripMargin
)
out.close()
socket.close()
Thread.sleep(10)
i += 1
}
}
pool.submit(client)
pool.submit(client)
pool.submit(client)
pool.submit(client)
}
The text was updated successfully, but these errors were encountered:
I managed to find the root cause. In my real endpoint I have the security logic based on two headers, not the body. If security logic fails ServerInterpreter calls this code
// 4. running the security logic
securityLogicResult <- ResultOrValue(
se.securityLogic(monad)(a).map(Right(_): Either[RequestResult[B], Either[E, U]]).handleError { case t: Throwable =>
endpointHandler(monad.error(t), endpointInterceptors)
.onSecurityFailure(SecurityFailureContext(se, a, request))
.map(r => Left(RequestResult.Response(r)): Either[RequestResult[B], Either[E, U]])
}
)
response <- securityLogicResult match {
case Left(e) =>
resultOrValueFrom.value(
endpointHandler(
responder(defaultErrorStatusCode)(request, model.ValuedEndpointOutput(se.endpoint.errorOutput, e)),
endpointInterceptors
)
.onSecurityFailure(SecurityFailureContext(se, a, request))
.map(r => RequestResult.Response(r): RequestResult[B])
)
In this case request body is not consumed and is tracked as a leakage.
Inside or after onSecurityFailure the request body should be consumed and discarded.
Tapir version: 1.11.15
Scala version: 3.5.2
NettyFutureServer
is used. We expose a public POST endpoint accepting a payload from the mobile clients. Due to the nature of mobile clients, sometimes the connection is closed in the middle of the request body. We observe the following error from time to time to time.What is the problem?
SimpleSubscriber
checks the received bytebuffer size. If the body is fully received, it releases the bytebuffer. Otherwise, it stores a link to the bytebuffer to be merged with the upcoming chunks. Probbaly, the Future returned from theSimpleSubscriber
is garbage collected before receiving the close signal, and in this case byte buffer is collected without release.How to reproduce?
Maybe you can provide code to reproduce the problem?
here's the server
and here's the client sending truncated requests
The text was updated successfully, but these errors were encountered: