Skip to content

Commit 9858e56

Browse files
committed
close jetty connections on monix timeout (or other cancellation)
1 parent 1c305ff commit 9858e56

File tree

3 files changed

+141
-56
lines changed

3 files changed

+141
-56
lines changed
Lines changed: 54 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
package io.udash
22
package rest.jetty
33

4-
import com.avsystem.commons._
4+
import com.avsystem.commons.*
55
import com.avsystem.commons.annotation.explicitGenerics
6-
import io.udash.rest.raw._
6+
import io.udash.rest.raw.*
77
import io.udash.utils.URLEncoder
88
import monix.eval.Task
9-
import org.eclipse.jetty.client.{BufferingResponseListener, BytesRequestContent, HttpClient, Result, StringRequestContent}
9+
import monix.execution.Callback
10+
import org.eclipse.jetty.client.*
1011
import org.eclipse.jetty.http.{HttpCookie, HttpHeader, MimeTypes}
1112

1213
import java.nio.charset.Charset
13-
import scala.concurrent.duration._
14-
import scala.util.{Failure, Success}
14+
import scala.concurrent.CancellationException
15+
import scala.concurrent.duration.*
1516

1617
object JettyRestClient {
1718
final val DefaultMaxResponseLength = 2 * 1024 * 1024
@@ -31,55 +32,57 @@ object JettyRestClient {
3132
maxResponseLength: Int = DefaultMaxResponseLength,
3233
timeout: Duration = DefaultTimeout
3334
): RawRest.HandleRequest =
34-
request => Task.async { callback =>
35-
val path = baseUrl + PlainValue.encodePath(request.parameters.path)
36-
val httpReq = client.newRequest(baseUrl).method(request.method.name)
35+
request => Task(client.newRequest(baseUrl).method(request.method.name)).flatMap { httpReq =>
36+
Task.async { (callback: Callback[Throwable, RestResponse]) =>
37+
val path = baseUrl + PlainValue.encodePath(request.parameters.path)
3738

38-
httpReq.path(path)
39-
request.parameters.query.entries.foreach {
40-
case (name, PlainValue(value)) => httpReq.param(name, value)
41-
}
42-
request.parameters.headers.entries.foreach {
43-
case (name, PlainValue(value)) => httpReq.headers(headers => headers.add(name, value))
44-
}
45-
request.parameters.cookies.entries.foreach {
46-
case (name, PlainValue(value)) => httpReq.cookie(HttpCookie.build(
47-
URLEncoder.encode(name, spaceAsPlus = true), URLEncoder.encode(value, spaceAsPlus = true)).build())
48-
}
49-
50-
request.body match {
51-
case HttpBody.Empty =>
52-
case tb: HttpBody.Textual =>
53-
httpReq.body(new StringRequestContent(tb.contentType, tb.content, Charset.forName(tb.charset)))
54-
case bb: HttpBody.Binary =>
55-
httpReq.body(new BytesRequestContent(bb.contentType, bb.bytes))
56-
}
39+
httpReq.path(path)
40+
request.parameters.query.entries.foreach {
41+
case (name, PlainValue(value)) => httpReq.param(name, value)
42+
}
43+
request.parameters.headers.entries.foreach {
44+
case (name, PlainValue(value)) => httpReq.headers(headers => headers.add(name, value))
45+
}
46+
request.parameters.cookies.entries.foreach {
47+
case (name, PlainValue(value)) => httpReq.cookie(HttpCookie.build(
48+
URLEncoder.encode(name, spaceAsPlus = true), URLEncoder.encode(value, spaceAsPlus = true)).build())
49+
}
5750

58-
timeout match {
59-
case fd: FiniteDuration => httpReq.timeout(fd.length, fd.unit)
60-
case _ =>
61-
}
51+
request.body match {
52+
case HttpBody.Empty =>
53+
case tb: HttpBody.Textual =>
54+
httpReq.body(new StringRequestContent(tb.contentType, tb.content, Charset.forName(tb.charset)))
55+
case bb: HttpBody.Binary =>
56+
httpReq.body(new BytesRequestContent(bb.contentType, bb.bytes))
57+
}
6258

63-
httpReq.send(new BufferingResponseListener(maxResponseLength) {
64-
override def onComplete(result: Result): Unit =
65-
if (result.isSucceeded) {
66-
val httpResp = result.getResponse
67-
val contentTypeOpt = httpResp.getHeaders.get(HttpHeader.CONTENT_TYPE).opt
68-
val charsetOpt = contentTypeOpt.map(MimeTypes.getCharsetFromContentType)
69-
val body = (contentTypeOpt, charsetOpt) match {
70-
case (Opt(contentType), Opt(charset)) =>
71-
HttpBody.textual(getContentAsString, MimeTypes.getContentTypeWithoutCharset(contentType), charset)
72-
case (Opt(contentType), Opt.Empty) =>
73-
HttpBody.binary(getContent, contentType)
74-
case _ =>
75-
HttpBody.Empty
76-
}
77-
val headers = httpResp.getHeaders.asScala.iterator.map(h => (h.getName, PlainValue(h.getValue))).toList
78-
val response = RestResponse(httpResp.getStatus, IMapping(headers), body)
79-
callback(Success(response))
80-
} else {
81-
callback(Failure(result.getFailure))
59+
timeout match {
60+
case fd: FiniteDuration => httpReq.timeout(fd.length, fd.unit)
61+
case _ =>
8262
}
83-
})
63+
64+
httpReq.send(new BufferingResponseListener(maxResponseLength) {
65+
override def onComplete(result: Result): Unit =
66+
if (result.isSucceeded) {
67+
val httpResp = result.getResponse
68+
val contentTypeOpt = httpResp.getHeaders.get(HttpHeader.CONTENT_TYPE).opt
69+
val charsetOpt = contentTypeOpt.map(MimeTypes.getCharsetFromContentType)
70+
val body = (contentTypeOpt, charsetOpt) match {
71+
case (Opt(contentType), Opt(charset)) =>
72+
HttpBody.textual(getContentAsString, MimeTypes.getContentTypeWithoutCharset(contentType), charset)
73+
case (Opt(contentType), Opt.Empty) =>
74+
HttpBody.binary(getContent, contentType)
75+
case _ =>
76+
HttpBody.Empty
77+
}
78+
val headers = httpResp.getHeaders.asScala.iterator.map(h => (h.getName, PlainValue(h.getValue))).toList
79+
val response = RestResponse(httpResp.getStatus, IMapping(headers), body)
80+
callback(Success(response))
81+
} else {
82+
callback(Failure(result.getFailure))
83+
}
84+
})
85+
}
86+
.doOnCancel(Task(httpReq.abort(new CancellationException("Request cancelled"))))
8487
}
8588
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package io.udash.rest.jetty
2+
3+
import com.avsystem.commons.misc.ScalaDurationExtensions.durationIntOps
4+
import com.avsystem.commons.universalOps
5+
import io.udash.rest.jetty.CloseStaleJettyConnectionsOnMonixTimeout.RestApiWithNeverCounter
6+
import io.udash.rest.{DefaultRestApiCompanion, GET, RestServlet}
7+
import monix.eval.Task
8+
import monix.execution.atomic.Atomic
9+
import org.eclipse.jetty.client.HttpClient
10+
import org.eclipse.jetty.ee8.servlet.{ServletContextHandler, ServletHolder}
11+
import org.eclipse.jetty.server.{NetworkConnector, Server}
12+
import org.scalatest.funsuite.AsyncFunSuite
13+
14+
import java.net.InetSocketAddress
15+
import scala.concurrent.Future
16+
import scala.concurrent.duration.{FiniteDuration, IntMult}
17+
18+
final class CloseStaleJettyConnectionsOnMonixTimeout extends AsyncFunSuite {
19+
20+
test("close connection on monix task timeout") {
21+
import monix.execution.Scheduler.Implicits.global
22+
23+
val MaxConnections: Int = 1 // to timeout quickly
24+
val Connections: Int = 10 // > MaxConnections
25+
val RequestTimeout: FiniteDuration = 1.hour // no timeout
26+
val CallTimeout: FiniteDuration = 300.millis
27+
28+
29+
val server = new Server(new InetSocketAddress("localhost", 0)) {
30+
setHandler(
31+
new ServletContextHandler().setup(
32+
_.addServlet(
33+
new ServletHolder(
34+
RestServlet[RestApiWithNeverCounter](RestApiWithNeverCounter.Impl)
35+
),
36+
"/*",
37+
)
38+
)
39+
)
40+
start()
41+
}
42+
43+
val httpClient = new HttpClient() {
44+
setMaxConnectionsPerDestination(MaxConnections)
45+
setIdleTimeout(RequestTimeout.toMillis)
46+
start()
47+
}
48+
49+
val client = JettyRestClient[RestApiWithNeverCounter](
50+
client = httpClient,
51+
baseUri = server.getConnectors.head |> { case connector: NetworkConnector => s"http://${connector.getHost}:${connector.getLocalPort}" },
52+
maxResponseLength = Int.MaxValue, // to avoid unnecessary logs
53+
timeout = RequestTimeout,
54+
)
55+
56+
Task
57+
.traverse(List.range(0, Connections))(_ => Task.fromFuture(client.neverGet).timeout(CallTimeout).failed)
58+
.timeoutTo(Connections * CallTimeout + 500.millis, Task(fail("All connections should have been closed"))) // + 500 millis just in case
59+
.map(_ => assert(RestApiWithNeverCounter.Impl.counter.get() == Connections)) // neverGet should be called Connections times
60+
.guarantee(Task {
61+
server.stop()
62+
httpClient.stop()
63+
})
64+
.runToFuture
65+
}
66+
}
67+
68+
object CloseStaleJettyConnectionsOnMonixTimeout {
69+
sealed trait RestApiWithNeverCounter {
70+
final val counter = Atomic(0)
71+
@GET def neverGet: Future[Unit]
72+
}
73+
74+
object RestApiWithNeverCounter extends DefaultRestApiCompanion[RestApiWithNeverCounter] {
75+
final val Impl: RestApiWithNeverCounter = new RestApiWithNeverCounter {
76+
override def neverGet: Future[Unit] = {
77+
counter.increment()
78+
Future.never
79+
}
80+
}
81+
}
82+
}

rest/src/test/scala/io/udash/rest/RestTestApi.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
package io.udash
22
package rest
33

4-
import com.avsystem.commons._
4+
import com.avsystem.commons.*
55
import com.avsystem.commons.misc.{AbstractValueEnum, AbstractValueEnumCompanion, EnumCtx}
66
import com.avsystem.commons.rpc.AsRawReal
7+
import com.avsystem.commons.serialization.*
78
import com.avsystem.commons.serialization.json.JsonStringOutput
8-
import com.avsystem.commons.serialization.{GenCodec, HasPolyGenCodec, flatten, name, whenAbsent}
9-
import io.udash.rest.openapi.adjusters._
10-
import io.udash.rest.openapi.{Header => OASHeader, _}
11-
import io.udash.rest.raw._
9+
import io.udash.rest.openapi.adjusters.*
10+
import io.udash.rest.openapi.{Header as OASHeader, *}
11+
import io.udash.rest.raw.*
1212
import monix.execution.{FutureUtils, Scheduler}
1313

1414
import scala.concurrent.Future

0 commit comments

Comments
 (0)