Skip to content

[BUG] LEAK: ByteBuf.release() was not called before it's garbage-collected #4539

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
akozich opened this issue Apr 22, 2025 · 1 comment · May be fixed by #4543
Open

[BUG] LEAK: ByteBuf.release() was not called before it's garbage-collected #4539

akozich opened this issue Apr 22, 2025 · 1 comment · May be fixed by #4543

Comments

@akozich
Copy link

akozich commented Apr 22, 2025

Tapir version: 1.11.15

Scala version: 3.5.2

NettyFutureServer is used. We expose a public POST endpoint accepting a payload from the mobile clients. Due to the nature of mobile clients, sometimes the connection is closed in the middle of the request body. We observe the following error from time to time to time.

LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
#1:
	Hint: 'HttpStreamsServerHandler#0-body-publisher' will handle the message from this point.
	io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:86)
	io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:25)
	io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:115)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
	org.playframework.netty.http.HttpStreamsHandler.handleReadHttpContent(HttpStreamsHandler.java:194)
	org.playframework.netty.http.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:165)
	org.playframework.netty.http.HttpStreamsServerHandler.channelRead(HttpStreamsServerHandler.java:96)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:107)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:359)
	io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
	io.opentelemetry.javaagent.shaded.instrumentation.netty.v4_1.internal.server.HttpServerRequestTracingHandler.channelRead(HttpServerRequestTracingHandler.java:44)
	io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1429)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:793)
	io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.handle(AbstractEpollChannel.java:471)
	io.netty.channel.epoll.EpollIoHandler$DefaultEpollIoRegistration.handle(EpollIoHandler.java:307)
	io.netty.channel.epoll.EpollIoHandler.processReady(EpollIoHandler.java:489)
	io.netty.channel.epoll.EpollIoHandler.run(EpollIoHandler.java:444)
	io.netty.channel.SingleThreadIoEventLoop.runIo(SingleThreadIoEventLoop.java:204)
	io.netty.channel.SingleThreadIoEventLoop.run(SingleThreadIoEventLoop.java:175)
	io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:1073)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Unknown Source)

What is the problem?

SimpleSubscriber checks the received bytebuffer size. If the body is fully received, it releases the bytebuffer. Otherwise, it stores a link to the bytebuffer to be merged with the upcoming chunks. Probbaly, the Future returned from the SimpleSubscriber is garbage collected before receiving the close signal, and in this case byte buffer is collected without release.

How to reproduce?

Maybe you can provide code to reproduce the problem?

here's the server

import io.circe.derivation.Configuration
import io.circe.derivation.ConfiguredCodec
import sttp.model.StatusCode
import sttp.tapir.*
import sttp.tapir.Schema
import sttp.tapir.json.circe.jsonBody
import sttp.tapir.server.netty.NettyFutureServer
import sttp.tapir.server.netty.NettyFutureServerOptions

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

given Configuration = Configuration.default.withSnakeCaseMemberNames.withDefaults

case class PermissionsInput(
  permissions: Seq[PermissionInput]
) derives ConfiguredCodec, Schema

case class PermissionInput(
  platform: String,
  id: String,
  accessType: String,
  state: String
) derives ConfiguredCodec, Schema

object Server {

  private val permissionsEndpoint =
    endpoint.put
      .securityIn(header[Option[String]]("X-Token-1"))
      .in("permissions")
      .in(jsonBody[PermissionsInput])
      .out(statusCode)

  private val permissionsServerEndpoint = permissionsEndpoint
    .serverSecurityLogic {
      case Some(value) => Future.successful(Right(value))
      case None        => Future.successful(Left(StatusCode.Unauthorized))
    }
    .serverLogicSuccess { (token: String) => (permissionsInput: PermissionsInput) =>
      Thread.sleep(100)
      println(permissionsInput.permissions.size)
      Future.successful(StatusCode.Created)
    }

  private val server = NettyFutureServer()
    .host("0.0.0.0")
    .options(
      NettyFutureServerOptions.customiseInterceptors.serverLog(
        NettyFutureServerOptions.defaultServerLog.logAllDecodeFailures(true).logWhenHandled(true)
      ).options
    )
    .port(8080)
    .addEndpoints(permissionsServerEndpoint :: Nil)

  def main(args: Array[String]): Unit = {
    server.start().andThen {
      case scala.util.Success(_) =>
        println("Server started on port 8080")
      case scala.util.Failure(exception) =>
        println(s"Failed to start server: ${exception.getMessage}")
    }
  }

}

and here's the client sending truncated requests

import java.io.PrintWriter
import java.net.Socket
import java.util.concurrent.Executors

object TruncatedRequestsSender extends App {
  private val pool = Executors.newFixedThreadPool(4)

  private val client: Runnable = { () =>
    var i = 0

    while (true) {
      println(s"${Thread.currentThread().getName}:$i")

      val socket = new Socket("localhost", 8080)
      val out = new PrintWriter(socket.getOutputStream, true)

      out.print(
        """PUT /permissions HTTP/1.1
          |Host: example.com
          |Content-Type: application/json
          |Content-Length: 500
          |
          |{
          |  "permissions": [
          |    {
          |      "id": "sleep",""".stripMargin
      )

      out.close()
      socket.close()

      Thread.sleep(10)

      i += 1
    }
  }
  pool.submit(client)
  pool.submit(client)
  pool.submit(client)
  pool.submit(client)
}
@akozich
Copy link
Author

akozich commented Apr 23, 2025

I managed to find the root cause. In my real endpoint I have the security logic based on two headers, not the body. If security logic fails ServerInterpreter calls this code

// 4. running the security logic
      securityLogicResult <- ResultOrValue(
        se.securityLogic(monad)(a).map(Right(_): Either[RequestResult[B], Either[E, U]]).handleError { case t: Throwable =>
          endpointHandler(monad.error(t), endpointInterceptors)
            .onSecurityFailure(SecurityFailureContext(se, a, request))
            .map(r => Left(RequestResult.Response(r)): Either[RequestResult[B], Either[E, U]])
        }
      )
      response <- securityLogicResult match {
        case Left(e) =>
          resultOrValueFrom.value(
            endpointHandler(
              responder(defaultErrorStatusCode)(request, model.ValuedEndpointOutput(se.endpoint.errorOutput, e)),
              endpointInterceptors
            )
              .onSecurityFailure(SecurityFailureContext(se, a, request))
              .map(r => RequestResult.Response(r): RequestResult[B])
          )

In this case request body is not consumed and is tracked as a leakage.

Inside or after onSecurityFailure the request body should be consumed and discarded.

@akozich akozich linked a pull request Apr 24, 2025 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant