From 9858e567b2bcd414740678f31912442c04c998cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Kozak?= Date: Wed, 11 Dec 2024 13:58:31 +0100 Subject: [PATCH 1/8] close jetty connections on monix timeout (or other cancellation) --- .../io/udash/rest/jetty/JettyRestClient.scala | 105 +++++++++--------- ...eStaleJettyConnectionsOnMonixTimeout.scala | 82 ++++++++++++++ .../scala/io/udash/rest/RestTestApi.scala | 10 +- 3 files changed, 141 insertions(+), 56 deletions(-) create mode 100644 rest/jetty/src/test/scala/io/udash/rest/jetty/CloseStaleJettyConnectionsOnMonixTimeout.scala diff --git a/rest/jetty/src/main/scala/io/udash/rest/jetty/JettyRestClient.scala b/rest/jetty/src/main/scala/io/udash/rest/jetty/JettyRestClient.scala index ed8c4e6af..86f1118a1 100644 --- a/rest/jetty/src/main/scala/io/udash/rest/jetty/JettyRestClient.scala +++ b/rest/jetty/src/main/scala/io/udash/rest/jetty/JettyRestClient.scala @@ -1,17 +1,18 @@ package io.udash package rest.jetty -import com.avsystem.commons._ +import com.avsystem.commons.* import com.avsystem.commons.annotation.explicitGenerics -import io.udash.rest.raw._ +import io.udash.rest.raw.* import io.udash.utils.URLEncoder import monix.eval.Task -import org.eclipse.jetty.client.{BufferingResponseListener, BytesRequestContent, HttpClient, Result, StringRequestContent} +import monix.execution.Callback +import org.eclipse.jetty.client.* import org.eclipse.jetty.http.{HttpCookie, HttpHeader, MimeTypes} import java.nio.charset.Charset -import scala.concurrent.duration._ -import scala.util.{Failure, Success} +import scala.concurrent.CancellationException +import scala.concurrent.duration.* object JettyRestClient { final val DefaultMaxResponseLength = 2 * 1024 * 1024 @@ -31,55 +32,57 @@ object JettyRestClient { maxResponseLength: Int = DefaultMaxResponseLength, timeout: Duration = DefaultTimeout ): RawRest.HandleRequest = - request => Task.async { callback => - val path = baseUrl + PlainValue.encodePath(request.parameters.path) - val httpReq = client.newRequest(baseUrl).method(request.method.name) + request => Task(client.newRequest(baseUrl).method(request.method.name)).flatMap { httpReq => + Task.async { (callback: Callback[Throwable, RestResponse]) => + val path = baseUrl + PlainValue.encodePath(request.parameters.path) - httpReq.path(path) - request.parameters.query.entries.foreach { - case (name, PlainValue(value)) => httpReq.param(name, value) - } - request.parameters.headers.entries.foreach { - case (name, PlainValue(value)) => httpReq.headers(headers => headers.add(name, value)) - } - request.parameters.cookies.entries.foreach { - case (name, PlainValue(value)) => httpReq.cookie(HttpCookie.build( - URLEncoder.encode(name, spaceAsPlus = true), URLEncoder.encode(value, spaceAsPlus = true)).build()) - } - - request.body match { - case HttpBody.Empty => - case tb: HttpBody.Textual => - httpReq.body(new StringRequestContent(tb.contentType, tb.content, Charset.forName(tb.charset))) - case bb: HttpBody.Binary => - httpReq.body(new BytesRequestContent(bb.contentType, bb.bytes)) - } + httpReq.path(path) + request.parameters.query.entries.foreach { + case (name, PlainValue(value)) => httpReq.param(name, value) + } + request.parameters.headers.entries.foreach { + case (name, PlainValue(value)) => httpReq.headers(headers => headers.add(name, value)) + } + request.parameters.cookies.entries.foreach { + case (name, PlainValue(value)) => httpReq.cookie(HttpCookie.build( + URLEncoder.encode(name, spaceAsPlus = true), URLEncoder.encode(value, spaceAsPlus = true)).build()) + } - timeout match { - case fd: FiniteDuration => httpReq.timeout(fd.length, fd.unit) - case _ => - } + request.body match { + case HttpBody.Empty => + case tb: HttpBody.Textual => + httpReq.body(new StringRequestContent(tb.contentType, tb.content, Charset.forName(tb.charset))) + case bb: HttpBody.Binary => + httpReq.body(new BytesRequestContent(bb.contentType, bb.bytes)) + } - httpReq.send(new BufferingResponseListener(maxResponseLength) { - override def onComplete(result: Result): Unit = - if (result.isSucceeded) { - val httpResp = result.getResponse - val contentTypeOpt = httpResp.getHeaders.get(HttpHeader.CONTENT_TYPE).opt - val charsetOpt = contentTypeOpt.map(MimeTypes.getCharsetFromContentType) - val body = (contentTypeOpt, charsetOpt) match { - case (Opt(contentType), Opt(charset)) => - HttpBody.textual(getContentAsString, MimeTypes.getContentTypeWithoutCharset(contentType), charset) - case (Opt(contentType), Opt.Empty) => - HttpBody.binary(getContent, contentType) - case _ => - HttpBody.Empty - } - val headers = httpResp.getHeaders.asScala.iterator.map(h => (h.getName, PlainValue(h.getValue))).toList - val response = RestResponse(httpResp.getStatus, IMapping(headers), body) - callback(Success(response)) - } else { - callback(Failure(result.getFailure)) + timeout match { + case fd: FiniteDuration => httpReq.timeout(fd.length, fd.unit) + case _ => } - }) + + httpReq.send(new BufferingResponseListener(maxResponseLength) { + override def onComplete(result: Result): Unit = + if (result.isSucceeded) { + val httpResp = result.getResponse + val contentTypeOpt = httpResp.getHeaders.get(HttpHeader.CONTENT_TYPE).opt + val charsetOpt = contentTypeOpt.map(MimeTypes.getCharsetFromContentType) + val body = (contentTypeOpt, charsetOpt) match { + case (Opt(contentType), Opt(charset)) => + HttpBody.textual(getContentAsString, MimeTypes.getContentTypeWithoutCharset(contentType), charset) + case (Opt(contentType), Opt.Empty) => + HttpBody.binary(getContent, contentType) + case _ => + HttpBody.Empty + } + val headers = httpResp.getHeaders.asScala.iterator.map(h => (h.getName, PlainValue(h.getValue))).toList + val response = RestResponse(httpResp.getStatus, IMapping(headers), body) + callback(Success(response)) + } else { + callback(Failure(result.getFailure)) + } + }) + } + .doOnCancel(Task(httpReq.abort(new CancellationException("Request cancelled")))) } } diff --git a/rest/jetty/src/test/scala/io/udash/rest/jetty/CloseStaleJettyConnectionsOnMonixTimeout.scala b/rest/jetty/src/test/scala/io/udash/rest/jetty/CloseStaleJettyConnectionsOnMonixTimeout.scala new file mode 100644 index 000000000..af8c764b2 --- /dev/null +++ b/rest/jetty/src/test/scala/io/udash/rest/jetty/CloseStaleJettyConnectionsOnMonixTimeout.scala @@ -0,0 +1,82 @@ +package io.udash.rest.jetty + +import com.avsystem.commons.misc.ScalaDurationExtensions.durationIntOps +import com.avsystem.commons.universalOps +import io.udash.rest.jetty.CloseStaleJettyConnectionsOnMonixTimeout.RestApiWithNeverCounter +import io.udash.rest.{DefaultRestApiCompanion, GET, RestServlet} +import monix.eval.Task +import monix.execution.atomic.Atomic +import org.eclipse.jetty.client.HttpClient +import org.eclipse.jetty.ee8.servlet.{ServletContextHandler, ServletHolder} +import org.eclipse.jetty.server.{NetworkConnector, Server} +import org.scalatest.funsuite.AsyncFunSuite + +import java.net.InetSocketAddress +import scala.concurrent.Future +import scala.concurrent.duration.{FiniteDuration, IntMult} + +final class CloseStaleJettyConnectionsOnMonixTimeout extends AsyncFunSuite { + + test("close connection on monix task timeout") { + import monix.execution.Scheduler.Implicits.global + + val MaxConnections: Int = 1 // to timeout quickly + val Connections: Int = 10 // > MaxConnections + val RequestTimeout: FiniteDuration = 1.hour // no timeout + val CallTimeout: FiniteDuration = 300.millis + + + val server = new Server(new InetSocketAddress("localhost", 0)) { + setHandler( + new ServletContextHandler().setup( + _.addServlet( + new ServletHolder( + RestServlet[RestApiWithNeverCounter](RestApiWithNeverCounter.Impl) + ), + "/*", + ) + ) + ) + start() + } + + val httpClient = new HttpClient() { + setMaxConnectionsPerDestination(MaxConnections) + setIdleTimeout(RequestTimeout.toMillis) + start() + } + + val client = JettyRestClient[RestApiWithNeverCounter]( + client = httpClient, + baseUri = server.getConnectors.head |> { case connector: NetworkConnector => s"http://${connector.getHost}:${connector.getLocalPort}" }, + maxResponseLength = Int.MaxValue, // to avoid unnecessary logs + timeout = RequestTimeout, + ) + + Task + .traverse(List.range(0, Connections))(_ => Task.fromFuture(client.neverGet).timeout(CallTimeout).failed) + .timeoutTo(Connections * CallTimeout + 500.millis, Task(fail("All connections should have been closed"))) // + 500 millis just in case + .map(_ => assert(RestApiWithNeverCounter.Impl.counter.get() == Connections)) // neverGet should be called Connections times + .guarantee(Task { + server.stop() + httpClient.stop() + }) + .runToFuture + } +} + +object CloseStaleJettyConnectionsOnMonixTimeout { + sealed trait RestApiWithNeverCounter { + final val counter = Atomic(0) + @GET def neverGet: Future[Unit] + } + + object RestApiWithNeverCounter extends DefaultRestApiCompanion[RestApiWithNeverCounter] { + final val Impl: RestApiWithNeverCounter = new RestApiWithNeverCounter { + override def neverGet: Future[Unit] = { + counter.increment() + Future.never + } + } + } +} diff --git a/rest/src/test/scala/io/udash/rest/RestTestApi.scala b/rest/src/test/scala/io/udash/rest/RestTestApi.scala index 50409fe7c..57cc8f0bd 100644 --- a/rest/src/test/scala/io/udash/rest/RestTestApi.scala +++ b/rest/src/test/scala/io/udash/rest/RestTestApi.scala @@ -1,14 +1,14 @@ package io.udash package rest -import com.avsystem.commons._ +import com.avsystem.commons.* import com.avsystem.commons.misc.{AbstractValueEnum, AbstractValueEnumCompanion, EnumCtx} import com.avsystem.commons.rpc.AsRawReal +import com.avsystem.commons.serialization.* import com.avsystem.commons.serialization.json.JsonStringOutput -import com.avsystem.commons.serialization.{GenCodec, HasPolyGenCodec, flatten, name, whenAbsent} -import io.udash.rest.openapi.adjusters._ -import io.udash.rest.openapi.{Header => OASHeader, _} -import io.udash.rest.raw._ +import io.udash.rest.openapi.adjusters.* +import io.udash.rest.openapi.{Header as OASHeader, *} +import io.udash.rest.raw.* import monix.execution.{FutureUtils, Scheduler} import scala.concurrent.Future From 48ad7b0fa1d8b09fcded827b63c580db1f2cce2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Kozak?= Date: Tue, 17 Dec 2024 09:51:09 +0100 Subject: [PATCH 2/8] add test case for non-timeout cancellation --- ...eStaleJettyConnectionsOnMonixTimeout.scala | 52 +++++++++++++------ 1 file changed, 37 insertions(+), 15 deletions(-) diff --git a/rest/jetty/src/test/scala/io/udash/rest/jetty/CloseStaleJettyConnectionsOnMonixTimeout.scala b/rest/jetty/src/test/scala/io/udash/rest/jetty/CloseStaleJettyConnectionsOnMonixTimeout.scala index af8c764b2..9828bce93 100644 --- a/rest/jetty/src/test/scala/io/udash/rest/jetty/CloseStaleJettyConnectionsOnMonixTimeout.scala +++ b/rest/jetty/src/test/scala/io/udash/rest/jetty/CloseStaleJettyConnectionsOnMonixTimeout.scala @@ -9,24 +9,29 @@ import monix.execution.atomic.Atomic import org.eclipse.jetty.client.HttpClient import org.eclipse.jetty.ee8.servlet.{ServletContextHandler, ServletHolder} import org.eclipse.jetty.server.{NetworkConnector, Server} +import org.scalatest.BeforeAndAfterEach import org.scalatest.funsuite.AsyncFunSuite import java.net.InetSocketAddress import scala.concurrent.Future import scala.concurrent.duration.{FiniteDuration, IntMult} -final class CloseStaleJettyConnectionsOnMonixTimeout extends AsyncFunSuite { +final class CloseStaleJettyConnectionsOnMonixTimeout extends AsyncFunSuite with BeforeAndAfterEach { - test("close connection on monix task timeout") { - import monix.execution.Scheduler.Implicits.global + import monix.execution.Scheduler.Implicits.global - val MaxConnections: Int = 1 // to timeout quickly - val Connections: Int = 10 // > MaxConnections - val RequestTimeout: FiniteDuration = 1.hour // no timeout - val CallTimeout: FiniteDuration = 300.millis + private val MaxConnections: Int = 1 // to timeout quickly + private val Connections: Int = 10 // > MaxConnections + private val RequestTimeout: FiniteDuration = 1.hour // no timeout + private val CallTimeout: FiniteDuration = 300.millis + private var server: Server = _ + private var httpClient: HttpClient = _ + private var client: RestApiWithNeverCounter = _ - val server = new Server(new InetSocketAddress("localhost", 0)) { + override def beforeEach(): Unit = { + super.beforeEach() + server = new Server(new InetSocketAddress("localhost", 0)) { setHandler( new ServletContextHandler().setup( _.addServlet( @@ -40,32 +45,49 @@ final class CloseStaleJettyConnectionsOnMonixTimeout extends AsyncFunSuite { start() } - val httpClient = new HttpClient() { + httpClient = new HttpClient() { setMaxConnectionsPerDestination(MaxConnections) setIdleTimeout(RequestTimeout.toMillis) start() } - val client = JettyRestClient[RestApiWithNeverCounter]( + client = JettyRestClient[RestApiWithNeverCounter]( client = httpClient, baseUri = server.getConnectors.head |> { case connector: NetworkConnector => s"http://${connector.getHost}:${connector.getLocalPort}" }, maxResponseLength = Int.MaxValue, // to avoid unnecessary logs timeout = RequestTimeout, ) + } + override def afterEach(): Unit = { + RestApiWithNeverCounter.Impl.counter.set(0) + server.stop() + httpClient.stop() + super.afterEach() + } + + test("close connection on monix task timeout") { Task .traverse(List.range(0, Connections))(_ => Task.fromFuture(client.neverGet).timeout(CallTimeout).failed) .timeoutTo(Connections * CallTimeout + 500.millis, Task(fail("All connections should have been closed"))) // + 500 millis just in case .map(_ => assert(RestApiWithNeverCounter.Impl.counter.get() == Connections)) // neverGet should be called Connections times - .guarantee(Task { - server.stop() - httpClient.stop() - }) + .runToFuture + } + + test("close connection on monix task cancellation") { + Task + .traverse(List.range(0, Connections)) { i => + val cancelable = Task.fromFuture(client.neverGet).runAsync(_ => ()) + Task.sleep(100.millis) + .restartUntil(_ => RestApiWithNeverCounter.Impl.counter.get() >= i) + .map(_ => cancelable.cancel()) + } + .map(_ => assert(RestApiWithNeverCounter.Impl.counter.get() == Connections)) .runToFuture } } -object CloseStaleJettyConnectionsOnMonixTimeout { +private object CloseStaleJettyConnectionsOnMonixTimeout { sealed trait RestApiWithNeverCounter { final val counter = Atomic(0) @GET def neverGet: Future[Unit] From 0b2b5cdec27a855bb21248d985463ca90ed5e0ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Kozak?= Date: Tue, 17 Dec 2024 12:01:18 +0100 Subject: [PATCH 3/8] make the "close connection on monix task *" tests generic --- ...eStaleJettyConnectionsOnMonixTimeout.scala | 104 ------------------ .../udash/rest/jetty/JettyRestCallTest.scala | 4 +- .../scala/io/udash/rest/RestApiTest.scala | 38 ++++++- .../scala/io/udash/rest/RestTestApi.scala | 8 +- 4 files changed, 47 insertions(+), 107 deletions(-) delete mode 100644 rest/jetty/src/test/scala/io/udash/rest/jetty/CloseStaleJettyConnectionsOnMonixTimeout.scala diff --git a/rest/jetty/src/test/scala/io/udash/rest/jetty/CloseStaleJettyConnectionsOnMonixTimeout.scala b/rest/jetty/src/test/scala/io/udash/rest/jetty/CloseStaleJettyConnectionsOnMonixTimeout.scala deleted file mode 100644 index 9828bce93..000000000 --- a/rest/jetty/src/test/scala/io/udash/rest/jetty/CloseStaleJettyConnectionsOnMonixTimeout.scala +++ /dev/null @@ -1,104 +0,0 @@ -package io.udash.rest.jetty - -import com.avsystem.commons.misc.ScalaDurationExtensions.durationIntOps -import com.avsystem.commons.universalOps -import io.udash.rest.jetty.CloseStaleJettyConnectionsOnMonixTimeout.RestApiWithNeverCounter -import io.udash.rest.{DefaultRestApiCompanion, GET, RestServlet} -import monix.eval.Task -import monix.execution.atomic.Atomic -import org.eclipse.jetty.client.HttpClient -import org.eclipse.jetty.ee8.servlet.{ServletContextHandler, ServletHolder} -import org.eclipse.jetty.server.{NetworkConnector, Server} -import org.scalatest.BeforeAndAfterEach -import org.scalatest.funsuite.AsyncFunSuite - -import java.net.InetSocketAddress -import scala.concurrent.Future -import scala.concurrent.duration.{FiniteDuration, IntMult} - -final class CloseStaleJettyConnectionsOnMonixTimeout extends AsyncFunSuite with BeforeAndAfterEach { - - import monix.execution.Scheduler.Implicits.global - - private val MaxConnections: Int = 1 // to timeout quickly - private val Connections: Int = 10 // > MaxConnections - private val RequestTimeout: FiniteDuration = 1.hour // no timeout - private val CallTimeout: FiniteDuration = 300.millis - - private var server: Server = _ - private var httpClient: HttpClient = _ - private var client: RestApiWithNeverCounter = _ - - override def beforeEach(): Unit = { - super.beforeEach() - server = new Server(new InetSocketAddress("localhost", 0)) { - setHandler( - new ServletContextHandler().setup( - _.addServlet( - new ServletHolder( - RestServlet[RestApiWithNeverCounter](RestApiWithNeverCounter.Impl) - ), - "/*", - ) - ) - ) - start() - } - - httpClient = new HttpClient() { - setMaxConnectionsPerDestination(MaxConnections) - setIdleTimeout(RequestTimeout.toMillis) - start() - } - - client = JettyRestClient[RestApiWithNeverCounter]( - client = httpClient, - baseUri = server.getConnectors.head |> { case connector: NetworkConnector => s"http://${connector.getHost}:${connector.getLocalPort}" }, - maxResponseLength = Int.MaxValue, // to avoid unnecessary logs - timeout = RequestTimeout, - ) - } - - override def afterEach(): Unit = { - RestApiWithNeverCounter.Impl.counter.set(0) - server.stop() - httpClient.stop() - super.afterEach() - } - - test("close connection on monix task timeout") { - Task - .traverse(List.range(0, Connections))(_ => Task.fromFuture(client.neverGet).timeout(CallTimeout).failed) - .timeoutTo(Connections * CallTimeout + 500.millis, Task(fail("All connections should have been closed"))) // + 500 millis just in case - .map(_ => assert(RestApiWithNeverCounter.Impl.counter.get() == Connections)) // neverGet should be called Connections times - .runToFuture - } - - test("close connection on monix task cancellation") { - Task - .traverse(List.range(0, Connections)) { i => - val cancelable = Task.fromFuture(client.neverGet).runAsync(_ => ()) - Task.sleep(100.millis) - .restartUntil(_ => RestApiWithNeverCounter.Impl.counter.get() >= i) - .map(_ => cancelable.cancel()) - } - .map(_ => assert(RestApiWithNeverCounter.Impl.counter.get() == Connections)) - .runToFuture - } -} - -private object CloseStaleJettyConnectionsOnMonixTimeout { - sealed trait RestApiWithNeverCounter { - final val counter = Atomic(0) - @GET def neverGet: Future[Unit] - } - - object RestApiWithNeverCounter extends DefaultRestApiCompanion[RestApiWithNeverCounter] { - final val Impl: RestApiWithNeverCounter = new RestApiWithNeverCounter { - override def neverGet: Future[Unit] = { - counter.increment() - Future.never - } - } - } -} diff --git a/rest/jetty/src/test/scala/io/udash/rest/jetty/JettyRestCallTest.scala b/rest/jetty/src/test/scala/io/udash/rest/jetty/JettyRestCallTest.scala index 868ee3dcf..c32e7a5de 100644 --- a/rest/jetty/src/test/scala/io/udash/rest/jetty/JettyRestCallTest.scala +++ b/rest/jetty/src/test/scala/io/udash/rest/jetty/JettyRestCallTest.scala @@ -6,7 +6,9 @@ import io.udash.rest.{RestApiTestScenarios, ServletBasedRestApiTest} import org.eclipse.jetty.client.HttpClient final class JettyRestCallTest extends ServletBasedRestApiTest with RestApiTestScenarios { - val client: HttpClient = new HttpClient + val client: HttpClient = new HttpClient() { + setMaxConnectionsPerDestination(MaxConnections) + } def clientHandle: HandleRequest = JettyRestClient.asHandleRequest(client, s"$baseUrl/api", maxPayloadSize) diff --git a/rest/src/test/scala/io/udash/rest/RestApiTest.scala b/rest/src/test/scala/io/udash/rest/RestApiTest.scala index c41f35f10..2ca25ca12 100644 --- a/rest/src/test/scala/io/udash/rest/RestApiTest.scala +++ b/rest/src/test/scala/io/udash/rest/RestApiTest.scala @@ -1,15 +1,25 @@ package io.udash package rest -import com.avsystem.commons._ +import com.avsystem.commons.* +import com.avsystem.commons.misc.ScalaDurationExtensions.durationIntOps import io.udash.rest.raw.RawRest import io.udash.rest.raw.RawRest.HandleRequest +import monix.eval.Task import monix.execution.Scheduler import org.scalactic.source.Position +import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.concurrent.ScalaFutures import org.scalatest.funsuite.AnyFunSuite +import scala.concurrent.duration.FiniteDuration + abstract class RestApiTest extends AnyFunSuite with ScalaFutures { + + protected final val MaxConnections: Int = 1 // to timeout quickly + protected final val Connections: Int = 10 // > MaxConnections + protected final val CallTimeout: FiniteDuration = 300.millis // << idle timeout + implicit def scheduler: Scheduler = Scheduler.global final val serverHandle: RawRest.HandleRequest = @@ -30,6 +40,9 @@ abstract class RestApiTest extends AnyFunSuite with ScalaFutures { case arr: Array[_] => IArraySeq.empty[AnyRef] ++ arr.iterator.map(mkDeep) case _ => value } + + def getNeverGetCounter(): Int = RestTestApi.Impl.neverGetCounter.get() + def resetNeverGetCounter(): Unit = RestTestApi.Impl.neverGetCounter.set(0) } trait RestApiTestScenarios extends RestApiTest { @@ -89,6 +102,29 @@ trait RestApiTestScenarios extends RestApiTest { test("body using third party type") { testCall(_.thirdPartyBody(HasThirdParty(ThirdParty(5)))) } + + test("close connection on monix task timeout") { + resetNeverGetCounter() + Task + .traverse(List.range(0, Connections))(_ => Task.deferFuture(proxy.neverGet).timeout(CallTimeout).failed) + .map(_ => assertResult(expected = Connections)(actual = getNeverGetCounter())) // neverGet should be called Connections times + .runToFuture + .futureValue(Timeout(30.seconds)) + } + + test("close connection on monix task cancellation") { + resetNeverGetCounter() + Task + .traverse(List.range(0, Connections)) { i => + val cancelable = Task.deferFuture(proxy.neverGet).runAsync(_ => ()) + Task.sleep(100.millis) + .restartUntil(_ => getNeverGetCounter() >= i) + .map(_ => cancelable.cancel()) + } + .map(_ => assertResult(expected = Connections)(actual = getNeverGetCounter())) // neverGet should be called Connections times + .runToFuture + .futureValue(Timeout(30.seconds)) + } } class DirectRestApiTest extends RestApiTestScenarios { diff --git a/rest/src/test/scala/io/udash/rest/RestTestApi.scala b/rest/src/test/scala/io/udash/rest/RestTestApi.scala index 57cc8f0bd..5fa7b489e 100644 --- a/rest/src/test/scala/io/udash/rest/RestTestApi.scala +++ b/rest/src/test/scala/io/udash/rest/RestTestApi.scala @@ -9,6 +9,7 @@ import com.avsystem.commons.serialization.json.JsonStringOutput import io.udash.rest.openapi.adjusters.* import io.udash.rest.openapi.{Header as OASHeader, *} import io.udash.rest.raw.* +import monix.execution.atomic.Atomic import monix.execution.{FutureUtils, Scheduler} import scala.concurrent.Future @@ -94,6 +95,8 @@ case class ErrorWrapper[T](error: T) object ErrorWrapper extends HasPolyGenCodec[ErrorWrapper] trait RestTestApi { + final val neverGetCounter = Atomic(0) + @GET @group("TrivialGroup") def trivialGet: Future[Unit] @GET @group("TrivialDescribedGroup") @tagDescription("something") def failingGet: Future[Unit] @GET def jsonFailingGet: Future[Unit] @@ -181,7 +184,10 @@ object RestTestApi extends DefaultRestApiCompanion[RestTestApi] { def failingGet: Future[Unit] = Future.failed(HttpErrorException.plain(503, "nie")) def jsonFailingGet: Future[Unit] = Future.failed(HttpErrorException(503, HttpBody.json(JsonValue(JsonStringOutput.write(ErrorWrapper("nie")))))) def moreFailingGet: Future[Unit] = throw HttpErrorException.plain(503, "nie") - def neverGet: Future[Unit] = Future.never + def neverGet: Future[Unit] = { + neverGetCounter.transform(_ + 1) + Future.never + } def wait(millis: Int): Future[String] = FutureUtils.delayedResult(millis.millis)(s"waited $millis ms") def getEntity(id: RestEntityId): Future[RestEntity] = Future.successful(RestEntity(id, s"${id.value}-name")) def complexGet(p1: Int, p2: String, h1: Int, h2: String, q1: Int, q2: String, q3: Opt[Int], c1: Int, c2: String): Future[RestEntity] = From a161834ed0e8dd3023556bbec553b70970d608bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Kozak?= Date: Tue, 17 Dec 2024 13:27:24 +0100 Subject: [PATCH 4/8] make tests independent --- .../test/scala/io/udash/rest/SttpRestCallTest.scala | 2 +- rest/src/test/scala/io/udash/rest/RestApiTest.scala | 11 ++++++----- rest/src/test/scala/io/udash/rest/RestTestApi.scala | 2 +- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala b/rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala index b3c0aad4a..b63432550 100644 --- a/rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala +++ b/rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala @@ -5,7 +5,7 @@ import io.udash.rest.raw.HttpErrorException import io.udash.rest.raw.RawRest.HandleRequest import sttp.client3.SttpBackend -import scala.concurrent.duration._ +import scala.concurrent.duration.* import scala.concurrent.{Await, Future} trait SttpClientRestTest extends ServletBasedRestApiTest { diff --git a/rest/src/test/scala/io/udash/rest/RestApiTest.scala b/rest/src/test/scala/io/udash/rest/RestApiTest.scala index 2ca25ca12..20e9d7502 100644 --- a/rest/src/test/scala/io/udash/rest/RestApiTest.scala +++ b/rest/src/test/scala/io/udash/rest/RestApiTest.scala @@ -15,15 +15,16 @@ import org.scalatest.funsuite.AnyFunSuite import scala.concurrent.duration.FiniteDuration abstract class RestApiTest extends AnyFunSuite with ScalaFutures { - protected final val MaxConnections: Int = 1 // to timeout quickly protected final val Connections: Int = 10 // > MaxConnections protected final val CallTimeout: FiniteDuration = 300.millis // << idle timeout implicit def scheduler: Scheduler = Scheduler.global + private val impl: RestTestApi = RestTestApi.Impl + final val serverHandle: RawRest.HandleRequest = - RawRest.asHandleRequest[RestTestApi](RestTestApi.Impl) + RawRest.asHandleRequest[RestTestApi](impl) def clientHandle: RawRest.HandleRequest @@ -33,7 +34,7 @@ abstract class RestApiTest extends AnyFunSuite with ScalaFutures { def testCall[T](call: RestTestApi => Future[T])(implicit pos: Position): Unit = assert( call(proxy).wrapToTry.futureValue.map(mkDeep) == - call(RestTestApi.Impl).catchFailures.wrapToTry.futureValue.map(mkDeep) + call(impl).catchFailures.wrapToTry.futureValue.map(mkDeep) ) def mkDeep(value: Any): Any = value match { @@ -41,8 +42,8 @@ abstract class RestApiTest extends AnyFunSuite with ScalaFutures { case _ => value } - def getNeverGetCounter(): Int = RestTestApi.Impl.neverGetCounter.get() - def resetNeverGetCounter(): Unit = RestTestApi.Impl.neverGetCounter.set(0) + def getNeverGetCounter(): Int = impl.neverGetCounter.get() + def resetNeverGetCounter(): Unit = impl.neverGetCounter.set(0) } trait RestApiTestScenarios extends RestApiTest { diff --git a/rest/src/test/scala/io/udash/rest/RestTestApi.scala b/rest/src/test/scala/io/udash/rest/RestTestApi.scala index 5fa7b489e..2db062963 100644 --- a/rest/src/test/scala/io/udash/rest/RestTestApi.scala +++ b/rest/src/test/scala/io/udash/rest/RestTestApi.scala @@ -179,7 +179,7 @@ object RestTestApi extends DefaultRestApiCompanion[RestTestApi] { import Scheduler.Implicits.global - val Impl: RestTestApi = new RestTestApi { + def Impl: RestTestApi = new RestTestApi { def trivialGet: Future[Unit] = Future.unit def failingGet: Future[Unit] = Future.failed(HttpErrorException.plain(503, "nie")) def jsonFailingGet: Future[Unit] = Future.failed(HttpErrorException(503, HttpBody.json(JsonValue(JsonStringOutput.write(ErrorWrapper("nie")))))) From b00c43b8f31ba5de46416cda74f0d82b6121049e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Kozak?= Date: Tue, 17 Dec 2024 14:03:13 +0100 Subject: [PATCH 5/8] nits --- .../test/scala/io/udash/rest/SttpRestCallTest.scala | 2 +- rest/src/test/scala/io/udash/rest/RestApiTest.scala | 10 ++++++---- rest/src/test/scala/io/udash/rest/RestTestApi.scala | 2 +- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala b/rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala index b63432550..b3c0aad4a 100644 --- a/rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala +++ b/rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala @@ -5,7 +5,7 @@ import io.udash.rest.raw.HttpErrorException import io.udash.rest.raw.RawRest.HandleRequest import sttp.client3.SttpBackend -import scala.concurrent.duration.* +import scala.concurrent.duration._ import scala.concurrent.{Await, Future} trait SttpClientRestTest extends ServletBasedRestApiTest { diff --git a/rest/src/test/scala/io/udash/rest/RestApiTest.scala b/rest/src/test/scala/io/udash/rest/RestApiTest.scala index 20e9d7502..e19727803 100644 --- a/rest/src/test/scala/io/udash/rest/RestApiTest.scala +++ b/rest/src/test/scala/io/udash/rest/RestApiTest.scala @@ -8,9 +8,9 @@ import io.udash.rest.raw.RawRest.HandleRequest import monix.eval.Task import monix.execution.Scheduler import org.scalactic.source.Position -import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.concurrent.ScalaFutures import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.time.{Millis, Seconds, Span} import scala.concurrent.duration.FiniteDuration @@ -21,7 +21,7 @@ abstract class RestApiTest extends AnyFunSuite with ScalaFutures { implicit def scheduler: Scheduler = Scheduler.global - private val impl: RestTestApi = RestTestApi.Impl + private val impl: RestTestApi = RestTestApi.impl() final val serverHandle: RawRest.HandleRequest = RawRest.asHandleRequest[RestTestApi](impl) @@ -47,6 +47,8 @@ abstract class RestApiTest extends AnyFunSuite with ScalaFutures { } trait RestApiTestScenarios extends RestApiTest { + override implicit val patienceConfig: PatienceConfig = PatienceConfig(scaled(Span(10, Seconds)), scaled(Span(50, Millis))) + test("trivial GET") { testCall(_.trivialGet) } @@ -110,7 +112,7 @@ trait RestApiTestScenarios extends RestApiTest { .traverse(List.range(0, Connections))(_ => Task.deferFuture(proxy.neverGet).timeout(CallTimeout).failed) .map(_ => assertResult(expected = Connections)(actual = getNeverGetCounter())) // neverGet should be called Connections times .runToFuture - .futureValue(Timeout(30.seconds)) + .futureValue } test("close connection on monix task cancellation") { @@ -124,7 +126,7 @@ trait RestApiTestScenarios extends RestApiTest { } .map(_ => assertResult(expected = Connections)(actual = getNeverGetCounter())) // neverGet should be called Connections times .runToFuture - .futureValue(Timeout(30.seconds)) + .futureValue } } diff --git a/rest/src/test/scala/io/udash/rest/RestTestApi.scala b/rest/src/test/scala/io/udash/rest/RestTestApi.scala index 2db062963..7cf330c46 100644 --- a/rest/src/test/scala/io/udash/rest/RestTestApi.scala +++ b/rest/src/test/scala/io/udash/rest/RestTestApi.scala @@ -179,7 +179,7 @@ object RestTestApi extends DefaultRestApiCompanion[RestTestApi] { import Scheduler.Implicits.global - def Impl: RestTestApi = new RestTestApi { + def impl(): RestTestApi = new RestTestApi { def trivialGet: Future[Unit] = Future.unit def failingGet: Future[Unit] = Future.failed(HttpErrorException.plain(503, "nie")) def jsonFailingGet: Future[Unit] = Future.failed(HttpErrorException(503, HttpBody.json(JsonValue(JsonStringOutput.write(ErrorWrapper("nie")))))) From 737fce1d9c243d0ae35f26f659b5aae6a9d4bc6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Kozak?= Date: Thu, 19 Dec 2024 13:29:08 +0100 Subject: [PATCH 6/8] reset counter in `beforeEach`, set IdleTimout explicitly, --- .../io/udash/rest/SttpRestCallTest.scala | 15 +++++++++--- .../udash/rest/jetty/JettyRestCallTest.scala | 1 + .../scala/io/udash/rest/RestApiTest.scala | 24 ++++++++++--------- .../scala/io/udash/rest/RestTestApi.scala | 12 ++++++---- 4 files changed, 34 insertions(+), 18 deletions(-) diff --git a/rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala b/rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala index b3c0aad4a..1c5d2d658 100644 --- a/rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala +++ b/rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala @@ -3,13 +3,22 @@ package rest import io.udash.rest.raw.HttpErrorException import io.udash.rest.raw.RawRest.HandleRequest -import sttp.client3.SttpBackend +import sttp.client3.{HttpClientFutureBackend, SttpBackend} -import scala.concurrent.duration._ +import java.net.http.HttpClient +import java.time.Duration as JDuration +import scala.concurrent.duration.* import scala.concurrent.{Await, Future} trait SttpClientRestTest extends ServletBasedRestApiTest { - implicit val backend: SttpBackend[Future, Any] = SttpRestClient.defaultBackend() + implicit val backend: SttpBackend[Future, Any] = HttpClientFutureBackend.usingClient( + //like defaultHttpClient but with connection timeout >> CallTimeout + HttpClient + .newBuilder() + .connectTimeout(JDuration.ofMillis(IdleTimout.toMillis)) + .followRedirects(HttpClient.Redirect.NEVER) + .build() + ) def clientHandle: HandleRequest = SttpRestClient.asHandleRequest[Future](s"$baseUrl/api") diff --git a/rest/jetty/src/test/scala/io/udash/rest/jetty/JettyRestCallTest.scala b/rest/jetty/src/test/scala/io/udash/rest/jetty/JettyRestCallTest.scala index c32e7a5de..71164072b 100644 --- a/rest/jetty/src/test/scala/io/udash/rest/jetty/JettyRestCallTest.scala +++ b/rest/jetty/src/test/scala/io/udash/rest/jetty/JettyRestCallTest.scala @@ -8,6 +8,7 @@ import org.eclipse.jetty.client.HttpClient final class JettyRestCallTest extends ServletBasedRestApiTest with RestApiTestScenarios { val client: HttpClient = new HttpClient() { setMaxConnectionsPerDestination(MaxConnections) + setIdleTimeout(IdleTimout.toMillis) } def clientHandle: HandleRequest = diff --git a/rest/src/test/scala/io/udash/rest/RestApiTest.scala b/rest/src/test/scala/io/udash/rest/RestApiTest.scala index e19727803..85d7b0045 100644 --- a/rest/src/test/scala/io/udash/rest/RestApiTest.scala +++ b/rest/src/test/scala/io/udash/rest/RestApiTest.scala @@ -8,20 +8,27 @@ import io.udash.rest.raw.RawRest.HandleRequest import monix.eval.Task import monix.execution.Scheduler import org.scalactic.source.Position +import org.scalatest.BeforeAndAfterEach import org.scalatest.concurrent.ScalaFutures import org.scalatest.funsuite.AnyFunSuite import org.scalatest.time.{Millis, Seconds, Span} import scala.concurrent.duration.FiniteDuration -abstract class RestApiTest extends AnyFunSuite with ScalaFutures { +abstract class RestApiTest extends AnyFunSuite with ScalaFutures with BeforeAndAfterEach { + implicit def scheduler: Scheduler = Scheduler.global + protected final val MaxConnections: Int = 1 // to timeout quickly protected final val Connections: Int = 10 // > MaxConnections protected final val CallTimeout: FiniteDuration = 300.millis // << idle timeout + protected final val IdleTimout: FiniteDuration = CallTimeout * 100 - implicit def scheduler: Scheduler = Scheduler.global + protected val impl: RestTestApi.Impl = new RestTestApi.Impl - private val impl: RestTestApi = RestTestApi.impl() + override protected def beforeEach(): Unit = { + super.beforeEach() + impl.resetCounter() + } final val serverHandle: RawRest.HandleRequest = RawRest.asHandleRequest[RestTestApi](impl) @@ -41,9 +48,6 @@ abstract class RestApiTest extends AnyFunSuite with ScalaFutures { case arr: Array[_] => IArraySeq.empty[AnyRef] ++ arr.iterator.map(mkDeep) case _ => value } - - def getNeverGetCounter(): Int = impl.neverGetCounter.get() - def resetNeverGetCounter(): Unit = impl.neverGetCounter.set(0) } trait RestApiTestScenarios extends RestApiTest { @@ -107,24 +111,22 @@ trait RestApiTestScenarios extends RestApiTest { } test("close connection on monix task timeout") { - resetNeverGetCounter() Task .traverse(List.range(0, Connections))(_ => Task.deferFuture(proxy.neverGet).timeout(CallTimeout).failed) - .map(_ => assertResult(expected = Connections)(actual = getNeverGetCounter())) // neverGet should be called Connections times + .map(_ => assertResult(expected = Connections)(actual = impl.counterValue())) // neverGet should be called Connections times .runToFuture .futureValue } test("close connection on monix task cancellation") { - resetNeverGetCounter() Task .traverse(List.range(0, Connections)) { i => val cancelable = Task.deferFuture(proxy.neverGet).runAsync(_ => ()) Task.sleep(100.millis) - .restartUntil(_ => getNeverGetCounter() >= i) + .restartUntil(_ => impl.counterValue() >= i) .map(_ => cancelable.cancel()) } - .map(_ => assertResult(expected = Connections)(actual = getNeverGetCounter())) // neverGet should be called Connections times + .map(_ => assertResult(expected = Connections)(actual = impl.counterValue())) // neverGet should be called Connections times .runToFuture .futureValue } diff --git a/rest/src/test/scala/io/udash/rest/RestTestApi.scala b/rest/src/test/scala/io/udash/rest/RestTestApi.scala index 7cf330c46..64ad06b33 100644 --- a/rest/src/test/scala/io/udash/rest/RestTestApi.scala +++ b/rest/src/test/scala/io/udash/rest/RestTestApi.scala @@ -9,7 +9,7 @@ import com.avsystem.commons.serialization.json.JsonStringOutput import io.udash.rest.openapi.adjusters.* import io.udash.rest.openapi.{Header as OASHeader, *} import io.udash.rest.raw.* -import monix.execution.atomic.Atomic +import monix.execution.atomic.{Atomic, AtomicInt} import monix.execution.{FutureUtils, Scheduler} import scala.concurrent.Future @@ -95,7 +95,6 @@ case class ErrorWrapper[T](error: T) object ErrorWrapper extends HasPolyGenCodec[ErrorWrapper] trait RestTestApi { - final val neverGetCounter = Atomic(0) @GET @group("TrivialGroup") def trivialGet: Future[Unit] @GET @group("TrivialDescribedGroup") @tagDescription("something") def failingGet: Future[Unit] @@ -179,13 +178,13 @@ object RestTestApi extends DefaultRestApiCompanion[RestTestApi] { import Scheduler.Implicits.global - def impl(): RestTestApi = new RestTestApi { + final class Impl extends RestTestApi { def trivialGet: Future[Unit] = Future.unit def failingGet: Future[Unit] = Future.failed(HttpErrorException.plain(503, "nie")) def jsonFailingGet: Future[Unit] = Future.failed(HttpErrorException(503, HttpBody.json(JsonValue(JsonStringOutput.write(ErrorWrapper("nie")))))) def moreFailingGet: Future[Unit] = throw HttpErrorException.plain(503, "nie") def neverGet: Future[Unit] = { - neverGetCounter.transform(_ + 1) + counter.increment() Future.never } def wait(millis: Int): Future[String] = FutureUtils.delayedResult(millis.millis)(s"waited $millis ms") @@ -209,6 +208,11 @@ object RestTestApi extends DefaultRestApiCompanion[RestTestApi] { def wrappedBinaryEcho(bytes: Bytes): Future[Bytes] = Future.successful(bytes) def wrappedBody(id: RestEntityId): Future[RestEntityId] = Future.successful(id) def thirdPartyBody(dur: HasThirdParty): Future[HasThirdParty] = Future.successful(dur) + + /** Counter for neverGet calls */ + private val counter: AtomicInt = Atomic(0) + def counterValue(): Int = counter.get() + def resetCounter(): Unit = counter.set(0) } } From 01cf9f01028541dc0e5e504990d91146fdf776b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Kozak?= Date: Mon, 23 Dec 2024 15:06:17 +0100 Subject: [PATCH 7/8] Refactor REST API tests to use consistent async syntax Updated test syntax to use `in` for better readability and migrated to fully asynchronous assertions where applicable. Removed redundant `.futureValue` statements, ensuring non-blocking behavior across test methods. --- .../udash/rest/ServletBasedRestApiTest.scala | 5 +- .../io/udash/rest/SttpRestCallTest.scala | 21 +++++--- .../scala/io/udash/rest/RestApiTest.scala | 51 +++++++++---------- 3 files changed, 39 insertions(+), 38 deletions(-) diff --git a/rest/.jvm/src/test/scala/io/udash/rest/ServletBasedRestApiTest.scala b/rest/.jvm/src/test/scala/io/udash/rest/ServletBasedRestApiTest.scala index 284955cfe..fff02b3b2 100644 --- a/rest/.jvm/src/test/scala/io/udash/rest/ServletBasedRestApiTest.scala +++ b/rest/.jvm/src/test/scala/io/udash/rest/ServletBasedRestApiTest.scala @@ -3,12 +3,11 @@ package rest import org.eclipse.jetty.ee8.servlet.{ServletContextHandler, ServletHolder} import org.eclipse.jetty.server.Server -import org.eclipse.jetty.ee8.servlet.{ServletHandler, ServletHolder} -import scala.concurrent.duration._ +import scala.concurrent.duration.* abstract class ServletBasedRestApiTest extends RestApiTest with UsesHttpServer { - override implicit def patienceConfig: PatienceConfig = PatienceConfig(10.seconds) + override implicit val patienceConfig: PatienceConfig = PatienceConfig(10.seconds) def maxPayloadSize: Int = 1024 * 1024 def serverTimeout: FiniteDuration = 10.seconds diff --git a/rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala b/rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala index 1c5d2d658..f77fa0ded 100644 --- a/rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala +++ b/rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala @@ -30,22 +30,27 @@ trait SttpClientRestTest extends ServletBasedRestApiTest { } class SttpRestCallTest extends SttpClientRestTest with RestApiTestScenarios { - test("too large binary request") { - val future = proxy.binaryEcho(Array.fill[Byte](maxPayloadSize + 1)(5)) - val exception = future.failed.futureValue - assert(exception == HttpErrorException.plain(413, "Payload is larger than maximum 1048576 bytes (1048577)")) + "too large binary request" in { + proxy.binaryEcho(Array.fill[Byte](maxPayloadSize + 1)(5)) + .failed + .map { exception => + assert(exception == HttpErrorException.plain(413, "Payload is larger than maximum 1048576 bytes (1048577)")) + } } } class ServletTimeoutTest extends SttpClientRestTest { override def serverTimeout: FiniteDuration = 500.millis - test("rest method timeout") { - val exception = proxy.neverGet.failed.futureValue - assert(exception == HttpErrorException.plain(500, "server operation timed out after 500 milliseconds")) + "rest method timeout" in { + proxy.neverGet + .failed + .map { exception => + assert(exception == HttpErrorException.plain(500, "server operation timed out after 500 milliseconds")) + } } - test("subsequent requests with timeout") { + "subsequent requests with timeout" in { assertThrows[HttpErrorException](Await.result(proxy.wait(600), Duration.Inf)) assertThrows[HttpErrorException](Await.result(proxy.wait(600), Duration.Inf)) assertThrows[HttpErrorException](Await.result(proxy.wait(600), Duration.Inf)) diff --git a/rest/src/test/scala/io/udash/rest/RestApiTest.scala b/rest/src/test/scala/io/udash/rest/RestApiTest.scala index 85d7b0045..50142a219 100644 --- a/rest/src/test/scala/io/udash/rest/RestApiTest.scala +++ b/rest/src/test/scala/io/udash/rest/RestApiTest.scala @@ -1,21 +1,21 @@ package io.udash package rest +import cats.implicits.catsSyntaxTuple2Semigroupal import com.avsystem.commons.* import com.avsystem.commons.misc.ScalaDurationExtensions.durationIntOps import io.udash.rest.raw.RawRest import io.udash.rest.raw.RawRest.HandleRequest +import io.udash.testing.AsyncUdashSharedTest import monix.eval.Task import monix.execution.Scheduler import org.scalactic.source.Position -import org.scalatest.BeforeAndAfterEach -import org.scalatest.concurrent.ScalaFutures -import org.scalatest.funsuite.AnyFunSuite import org.scalatest.time.{Millis, Seconds, Span} +import org.scalatest.{Assertion, BeforeAndAfterEach} import scala.concurrent.duration.FiniteDuration -abstract class RestApiTest extends AnyFunSuite with ScalaFutures with BeforeAndAfterEach { +abstract class RestApiTest extends AsyncUdashSharedTest with BeforeAndAfterEach { implicit def scheduler: Scheduler = Scheduler.global protected final val MaxConnections: Int = 1 // to timeout quickly @@ -38,11 +38,10 @@ abstract class RestApiTest extends AnyFunSuite with ScalaFutures with BeforeAndA lazy val proxy: RestTestApi = RawRest.fromHandleRequest[RestTestApi](clientHandle) - def testCall[T](call: RestTestApi => Future[T])(implicit pos: Position): Unit = - assert( - call(proxy).wrapToTry.futureValue.map(mkDeep) == - call(impl).catchFailures.wrapToTry.futureValue.map(mkDeep) - ) + def testCall[T](call: RestTestApi => Future[T])(implicit pos: Position): Future[Assertion] = + (call(proxy).wrapToTry, call(impl).catchFailures.wrapToTry).mapN { (proxyResult, implResult) => + assert(proxyResult.map(mkDeep) == implResult.map(mkDeep)) + } def mkDeep(value: Any): Any = value match { case arr: Array[_] => IArraySeq.empty[AnyRef] ++ arr.iterator.map(mkDeep) @@ -53,72 +52,71 @@ abstract class RestApiTest extends AnyFunSuite with ScalaFutures with BeforeAndA trait RestApiTestScenarios extends RestApiTest { override implicit val patienceConfig: PatienceConfig = PatienceConfig(scaled(Span(10, Seconds)), scaled(Span(50, Millis))) - test("trivial GET") { + "trivial GET" in { testCall(_.trivialGet) } - test("failing GET") { + "failing GET" in { testCall(_.failingGet) } - test("JSON failing GET") { + "JSON failing GET" in { testCall(_.jsonFailingGet) } - test("more failing GET") { + "more failing GET" in { testCall(_.moreFailingGet) } - test("complex GET") { + "complex GET" in { testCall(_.complexGet(0, "a/ +&", 1, "b/ +&", 2, "ć/ +&", Opt(3), 4, "ó /&f")) testCall(_.complexGet(0, "a/ +&", 1, "b/ +&", 2, "ć/ +&", Opt.Empty, 3, "ó /&f")) } - test("multi-param body POST") { + "multi-param body POST" in { testCall(_.multiParamPost(0, "a/ +&", 1, "b/ +&", 2, "ć/ +&", 3, "l\"l")) } - test("single body PUT") { + "single body PUT" in { testCall(_.singleBodyPut(RestEntity(RestEntityId("id"), "señor"))) } - test("form POST") { + "form POST" in { testCall(_.formPost("ó", "ą=ę", 42)) } - test("prefixed GET") { + "prefixed GET" in { testCall(_.prefix("p0", "h0", "q0").subget(0, 1, 2)) } - test("transparent prefix GET") { + "transparent prefix GET" in { testCall(_.transparentPrefix.subget(0, 1, 2)) } - test("custom response with headers") { + "custom response with headers" in { testCall(_.customResponse("walue")) } - test("binary request and response") { + "binary request and response" in { testCall(_.binaryEcho(Array.fill[Byte](5)(5))) } - test("large binary request and response") { + "large binary request and response" in { testCall(_.binaryEcho(Array.fill[Byte](1024 * 1024)(5))) } - test("body using third party type") { + "body using third party type" in { testCall(_.thirdPartyBody(HasThirdParty(ThirdParty(5)))) } - test("close connection on monix task timeout") { + "close connection on monix task timeout" in { Task .traverse(List.range(0, Connections))(_ => Task.deferFuture(proxy.neverGet).timeout(CallTimeout).failed) .map(_ => assertResult(expected = Connections)(actual = impl.counterValue())) // neverGet should be called Connections times .runToFuture - .futureValue } - test("close connection on monix task cancellation") { + "close connection on monix task cancellation" in { Task .traverse(List.range(0, Connections)) { i => val cancelable = Task.deferFuture(proxy.neverGet).runAsync(_ => ()) @@ -128,7 +126,6 @@ trait RestApiTestScenarios extends RestApiTest { } .map(_ => assertResult(expected = Connections)(actual = impl.counterValue())) // neverGet should be called Connections times .runToFuture - .futureValue } } From c1492564a3d178f43cd0689ad56f772843c75ce1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Kozak?= Date: Mon, 23 Dec 2024 15:45:02 +0100 Subject: [PATCH 8/8] Add detailed comments for custom HttpClient setups Clarify the purpose of HttpClient configurations in `SttpRestCallTest` and `JettyRestCallTest`. The added comments explain the use of a connection timeout significantly exceeding the CallTimeout value, improving code readability and maintainability. --- .../.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala | 5 ++++- .../test/scala/io/udash/rest/jetty/JettyRestCallTest.scala | 4 ++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala b/rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala index f77fa0ded..38fdee737 100644 --- a/rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala +++ b/rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala @@ -11,8 +11,11 @@ import scala.concurrent.duration.* import scala.concurrent.{Await, Future} trait SttpClientRestTest extends ServletBasedRestApiTest { + /** + * Similar to the defaultHttpClient, but with a connection timeout + * significantly exceeding the value of the CallTimeout + */ implicit val backend: SttpBackend[Future, Any] = HttpClientFutureBackend.usingClient( - //like defaultHttpClient but with connection timeout >> CallTimeout HttpClient .newBuilder() .connectTimeout(JDuration.ofMillis(IdleTimout.toMillis)) diff --git a/rest/jetty/src/test/scala/io/udash/rest/jetty/JettyRestCallTest.scala b/rest/jetty/src/test/scala/io/udash/rest/jetty/JettyRestCallTest.scala index 71164072b..22fb5ce69 100644 --- a/rest/jetty/src/test/scala/io/udash/rest/jetty/JettyRestCallTest.scala +++ b/rest/jetty/src/test/scala/io/udash/rest/jetty/JettyRestCallTest.scala @@ -6,6 +6,10 @@ import io.udash.rest.{RestApiTestScenarios, ServletBasedRestApiTest} import org.eclipse.jetty.client.HttpClient final class JettyRestCallTest extends ServletBasedRestApiTest with RestApiTestScenarios { + /** + * Similar to the default HttpClient, but with a connection timeout + * significantly exceeding the value of the CallTimeout + */ val client: HttpClient = new HttpClient() { setMaxConnectionsPerDestination(MaxConnections) setIdleTimeout(IdleTimout.toMillis)