diff --git a/rest/.jvm/src/test/scala/io/udash/rest/ServletBasedRestApiTest.scala b/rest/.jvm/src/test/scala/io/udash/rest/ServletBasedRestApiTest.scala index 284955cfe..fff02b3b2 100644 --- a/rest/.jvm/src/test/scala/io/udash/rest/ServletBasedRestApiTest.scala +++ b/rest/.jvm/src/test/scala/io/udash/rest/ServletBasedRestApiTest.scala @@ -3,12 +3,11 @@ package rest import org.eclipse.jetty.ee8.servlet.{ServletContextHandler, ServletHolder} import org.eclipse.jetty.server.Server -import org.eclipse.jetty.ee8.servlet.{ServletHandler, ServletHolder} -import scala.concurrent.duration._ +import scala.concurrent.duration.* abstract class ServletBasedRestApiTest extends RestApiTest with UsesHttpServer { - override implicit def patienceConfig: PatienceConfig = PatienceConfig(10.seconds) + override implicit val patienceConfig: PatienceConfig = PatienceConfig(10.seconds) def maxPayloadSize: Int = 1024 * 1024 def serverTimeout: FiniteDuration = 10.seconds diff --git a/rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala b/rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala index b3c0aad4a..38fdee737 100644 --- a/rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala +++ b/rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala @@ -3,13 +3,25 @@ package rest import io.udash.rest.raw.HttpErrorException import io.udash.rest.raw.RawRest.HandleRequest -import sttp.client3.SttpBackend +import sttp.client3.{HttpClientFutureBackend, SttpBackend} -import scala.concurrent.duration._ +import java.net.http.HttpClient +import java.time.Duration as JDuration +import scala.concurrent.duration.* import scala.concurrent.{Await, Future} trait SttpClientRestTest extends ServletBasedRestApiTest { - implicit val backend: SttpBackend[Future, Any] = SttpRestClient.defaultBackend() + /** + * Similar to the defaultHttpClient, but with a connection timeout + * significantly exceeding the value of the CallTimeout + */ + implicit val backend: SttpBackend[Future, Any] = HttpClientFutureBackend.usingClient( + HttpClient + .newBuilder() + .connectTimeout(JDuration.ofMillis(IdleTimout.toMillis)) + .followRedirects(HttpClient.Redirect.NEVER) + .build() + ) def clientHandle: HandleRequest = SttpRestClient.asHandleRequest[Future](s"$baseUrl/api") @@ -21,22 +33,27 @@ trait SttpClientRestTest extends ServletBasedRestApiTest { } class SttpRestCallTest extends SttpClientRestTest with RestApiTestScenarios { - test("too large binary request") { - val future = proxy.binaryEcho(Array.fill[Byte](maxPayloadSize + 1)(5)) - val exception = future.failed.futureValue - assert(exception == HttpErrorException.plain(413, "Payload is larger than maximum 1048576 bytes (1048577)")) + "too large binary request" in { + proxy.binaryEcho(Array.fill[Byte](maxPayloadSize + 1)(5)) + .failed + .map { exception => + assert(exception == HttpErrorException.plain(413, "Payload is larger than maximum 1048576 bytes (1048577)")) + } } } class ServletTimeoutTest extends SttpClientRestTest { override def serverTimeout: FiniteDuration = 500.millis - test("rest method timeout") { - val exception = proxy.neverGet.failed.futureValue - assert(exception == HttpErrorException.plain(500, "server operation timed out after 500 milliseconds")) + "rest method timeout" in { + proxy.neverGet + .failed + .map { exception => + assert(exception == HttpErrorException.plain(500, "server operation timed out after 500 milliseconds")) + } } - test("subsequent requests with timeout") { + "subsequent requests with timeout" in { assertThrows[HttpErrorException](Await.result(proxy.wait(600), Duration.Inf)) assertThrows[HttpErrorException](Await.result(proxy.wait(600), Duration.Inf)) assertThrows[HttpErrorException](Await.result(proxy.wait(600), Duration.Inf)) diff --git a/rest/jetty/src/main/scala/io/udash/rest/jetty/JettyRestClient.scala b/rest/jetty/src/main/scala/io/udash/rest/jetty/JettyRestClient.scala index ed8c4e6af..86f1118a1 100644 --- a/rest/jetty/src/main/scala/io/udash/rest/jetty/JettyRestClient.scala +++ b/rest/jetty/src/main/scala/io/udash/rest/jetty/JettyRestClient.scala @@ -1,17 +1,18 @@ package io.udash package rest.jetty -import com.avsystem.commons._ +import com.avsystem.commons.* import com.avsystem.commons.annotation.explicitGenerics -import io.udash.rest.raw._ +import io.udash.rest.raw.* import io.udash.utils.URLEncoder import monix.eval.Task -import org.eclipse.jetty.client.{BufferingResponseListener, BytesRequestContent, HttpClient, Result, StringRequestContent} +import monix.execution.Callback +import org.eclipse.jetty.client.* import org.eclipse.jetty.http.{HttpCookie, HttpHeader, MimeTypes} import java.nio.charset.Charset -import scala.concurrent.duration._ -import scala.util.{Failure, Success} +import scala.concurrent.CancellationException +import scala.concurrent.duration.* object JettyRestClient { final val DefaultMaxResponseLength = 2 * 1024 * 1024 @@ -31,55 +32,57 @@ object JettyRestClient { maxResponseLength: Int = DefaultMaxResponseLength, timeout: Duration = DefaultTimeout ): RawRest.HandleRequest = - request => Task.async { callback => - val path = baseUrl + PlainValue.encodePath(request.parameters.path) - val httpReq = client.newRequest(baseUrl).method(request.method.name) + request => Task(client.newRequest(baseUrl).method(request.method.name)).flatMap { httpReq => + Task.async { (callback: Callback[Throwable, RestResponse]) => + val path = baseUrl + PlainValue.encodePath(request.parameters.path) - httpReq.path(path) - request.parameters.query.entries.foreach { - case (name, PlainValue(value)) => httpReq.param(name, value) - } - request.parameters.headers.entries.foreach { - case (name, PlainValue(value)) => httpReq.headers(headers => headers.add(name, value)) - } - request.parameters.cookies.entries.foreach { - case (name, PlainValue(value)) => httpReq.cookie(HttpCookie.build( - URLEncoder.encode(name, spaceAsPlus = true), URLEncoder.encode(value, spaceAsPlus = true)).build()) - } - - request.body match { - case HttpBody.Empty => - case tb: HttpBody.Textual => - httpReq.body(new StringRequestContent(tb.contentType, tb.content, Charset.forName(tb.charset))) - case bb: HttpBody.Binary => - httpReq.body(new BytesRequestContent(bb.contentType, bb.bytes)) - } + httpReq.path(path) + request.parameters.query.entries.foreach { + case (name, PlainValue(value)) => httpReq.param(name, value) + } + request.parameters.headers.entries.foreach { + case (name, PlainValue(value)) => httpReq.headers(headers => headers.add(name, value)) + } + request.parameters.cookies.entries.foreach { + case (name, PlainValue(value)) => httpReq.cookie(HttpCookie.build( + URLEncoder.encode(name, spaceAsPlus = true), URLEncoder.encode(value, spaceAsPlus = true)).build()) + } - timeout match { - case fd: FiniteDuration => httpReq.timeout(fd.length, fd.unit) - case _ => - } + request.body match { + case HttpBody.Empty => + case tb: HttpBody.Textual => + httpReq.body(new StringRequestContent(tb.contentType, tb.content, Charset.forName(tb.charset))) + case bb: HttpBody.Binary => + httpReq.body(new BytesRequestContent(bb.contentType, bb.bytes)) + } - httpReq.send(new BufferingResponseListener(maxResponseLength) { - override def onComplete(result: Result): Unit = - if (result.isSucceeded) { - val httpResp = result.getResponse - val contentTypeOpt = httpResp.getHeaders.get(HttpHeader.CONTENT_TYPE).opt - val charsetOpt = contentTypeOpt.map(MimeTypes.getCharsetFromContentType) - val body = (contentTypeOpt, charsetOpt) match { - case (Opt(contentType), Opt(charset)) => - HttpBody.textual(getContentAsString, MimeTypes.getContentTypeWithoutCharset(contentType), charset) - case (Opt(contentType), Opt.Empty) => - HttpBody.binary(getContent, contentType) - case _ => - HttpBody.Empty - } - val headers = httpResp.getHeaders.asScala.iterator.map(h => (h.getName, PlainValue(h.getValue))).toList - val response = RestResponse(httpResp.getStatus, IMapping(headers), body) - callback(Success(response)) - } else { - callback(Failure(result.getFailure)) + timeout match { + case fd: FiniteDuration => httpReq.timeout(fd.length, fd.unit) + case _ => } - }) + + httpReq.send(new BufferingResponseListener(maxResponseLength) { + override def onComplete(result: Result): Unit = + if (result.isSucceeded) { + val httpResp = result.getResponse + val contentTypeOpt = httpResp.getHeaders.get(HttpHeader.CONTENT_TYPE).opt + val charsetOpt = contentTypeOpt.map(MimeTypes.getCharsetFromContentType) + val body = (contentTypeOpt, charsetOpt) match { + case (Opt(contentType), Opt(charset)) => + HttpBody.textual(getContentAsString, MimeTypes.getContentTypeWithoutCharset(contentType), charset) + case (Opt(contentType), Opt.Empty) => + HttpBody.binary(getContent, contentType) + case _ => + HttpBody.Empty + } + val headers = httpResp.getHeaders.asScala.iterator.map(h => (h.getName, PlainValue(h.getValue))).toList + val response = RestResponse(httpResp.getStatus, IMapping(headers), body) + callback(Success(response)) + } else { + callback(Failure(result.getFailure)) + } + }) + } + .doOnCancel(Task(httpReq.abort(new CancellationException("Request cancelled")))) } } diff --git a/rest/jetty/src/test/scala/io/udash/rest/jetty/JettyRestCallTest.scala b/rest/jetty/src/test/scala/io/udash/rest/jetty/JettyRestCallTest.scala index 868ee3dcf..22fb5ce69 100644 --- a/rest/jetty/src/test/scala/io/udash/rest/jetty/JettyRestCallTest.scala +++ b/rest/jetty/src/test/scala/io/udash/rest/jetty/JettyRestCallTest.scala @@ -6,7 +6,14 @@ import io.udash.rest.{RestApiTestScenarios, ServletBasedRestApiTest} import org.eclipse.jetty.client.HttpClient final class JettyRestCallTest extends ServletBasedRestApiTest with RestApiTestScenarios { - val client: HttpClient = new HttpClient + /** + * Similar to the default HttpClient, but with a connection timeout + * significantly exceeding the value of the CallTimeout + */ + val client: HttpClient = new HttpClient() { + setMaxConnectionsPerDestination(MaxConnections) + setIdleTimeout(IdleTimout.toMillis) + } def clientHandle: HandleRequest = JettyRestClient.asHandleRequest(client, s"$baseUrl/api", maxPayloadSize) diff --git a/rest/src/test/scala/io/udash/rest/RestApiTest.scala b/rest/src/test/scala/io/udash/rest/RestApiTest.scala index c41f35f10..50142a219 100644 --- a/rest/src/test/scala/io/udash/rest/RestApiTest.scala +++ b/rest/src/test/scala/io/udash/rest/RestApiTest.scala @@ -1,30 +1,47 @@ package io.udash package rest -import com.avsystem.commons._ +import cats.implicits.catsSyntaxTuple2Semigroupal +import com.avsystem.commons.* +import com.avsystem.commons.misc.ScalaDurationExtensions.durationIntOps import io.udash.rest.raw.RawRest import io.udash.rest.raw.RawRest.HandleRequest +import io.udash.testing.AsyncUdashSharedTest +import monix.eval.Task import monix.execution.Scheduler import org.scalactic.source.Position -import org.scalatest.concurrent.ScalaFutures -import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.time.{Millis, Seconds, Span} +import org.scalatest.{Assertion, BeforeAndAfterEach} -abstract class RestApiTest extends AnyFunSuite with ScalaFutures { +import scala.concurrent.duration.FiniteDuration + +abstract class RestApiTest extends AsyncUdashSharedTest with BeforeAndAfterEach { implicit def scheduler: Scheduler = Scheduler.global + protected final val MaxConnections: Int = 1 // to timeout quickly + protected final val Connections: Int = 10 // > MaxConnections + protected final val CallTimeout: FiniteDuration = 300.millis // << idle timeout + protected final val IdleTimout: FiniteDuration = CallTimeout * 100 + + protected val impl: RestTestApi.Impl = new RestTestApi.Impl + + override protected def beforeEach(): Unit = { + super.beforeEach() + impl.resetCounter() + } + final val serverHandle: RawRest.HandleRequest = - RawRest.asHandleRequest[RestTestApi](RestTestApi.Impl) + RawRest.asHandleRequest[RestTestApi](impl) def clientHandle: RawRest.HandleRequest lazy val proxy: RestTestApi = RawRest.fromHandleRequest[RestTestApi](clientHandle) - def testCall[T](call: RestTestApi => Future[T])(implicit pos: Position): Unit = - assert( - call(proxy).wrapToTry.futureValue.map(mkDeep) == - call(RestTestApi.Impl).catchFailures.wrapToTry.futureValue.map(mkDeep) - ) + def testCall[T](call: RestTestApi => Future[T])(implicit pos: Position): Future[Assertion] = + (call(proxy).wrapToTry, call(impl).catchFailures.wrapToTry).mapN { (proxyResult, implResult) => + assert(proxyResult.map(mkDeep) == implResult.map(mkDeep)) + } def mkDeep(value: Any): Any = value match { case arr: Array[_] => IArraySeq.empty[AnyRef] ++ arr.iterator.map(mkDeep) @@ -33,62 +50,83 @@ abstract class RestApiTest extends AnyFunSuite with ScalaFutures { } trait RestApiTestScenarios extends RestApiTest { - test("trivial GET") { + override implicit val patienceConfig: PatienceConfig = PatienceConfig(scaled(Span(10, Seconds)), scaled(Span(50, Millis))) + + "trivial GET" in { testCall(_.trivialGet) } - test("failing GET") { + "failing GET" in { testCall(_.failingGet) } - test("JSON failing GET") { + "JSON failing GET" in { testCall(_.jsonFailingGet) } - test("more failing GET") { + "more failing GET" in { testCall(_.moreFailingGet) } - test("complex GET") { + "complex GET" in { testCall(_.complexGet(0, "a/ +&", 1, "b/ +&", 2, "ć/ +&", Opt(3), 4, "ó /&f")) testCall(_.complexGet(0, "a/ +&", 1, "b/ +&", 2, "ć/ +&", Opt.Empty, 3, "ó /&f")) } - test("multi-param body POST") { + "multi-param body POST" in { testCall(_.multiParamPost(0, "a/ +&", 1, "b/ +&", 2, "ć/ +&", 3, "l\"l")) } - test("single body PUT") { + "single body PUT" in { testCall(_.singleBodyPut(RestEntity(RestEntityId("id"), "señor"))) } - test("form POST") { + "form POST" in { testCall(_.formPost("ó", "ą=ę", 42)) } - test("prefixed GET") { + "prefixed GET" in { testCall(_.prefix("p0", "h0", "q0").subget(0, 1, 2)) } - test("transparent prefix GET") { + "transparent prefix GET" in { testCall(_.transparentPrefix.subget(0, 1, 2)) } - test("custom response with headers") { + "custom response with headers" in { testCall(_.customResponse("walue")) } - test("binary request and response") { + "binary request and response" in { testCall(_.binaryEcho(Array.fill[Byte](5)(5))) } - test("large binary request and response") { + "large binary request and response" in { testCall(_.binaryEcho(Array.fill[Byte](1024 * 1024)(5))) } - test("body using third party type") { + "body using third party type" in { testCall(_.thirdPartyBody(HasThirdParty(ThirdParty(5)))) } + + "close connection on monix task timeout" in { + Task + .traverse(List.range(0, Connections))(_ => Task.deferFuture(proxy.neverGet).timeout(CallTimeout).failed) + .map(_ => assertResult(expected = Connections)(actual = impl.counterValue())) // neverGet should be called Connections times + .runToFuture + } + + "close connection on monix task cancellation" in { + Task + .traverse(List.range(0, Connections)) { i => + val cancelable = Task.deferFuture(proxy.neverGet).runAsync(_ => ()) + Task.sleep(100.millis) + .restartUntil(_ => impl.counterValue() >= i) + .map(_ => cancelable.cancel()) + } + .map(_ => assertResult(expected = Connections)(actual = impl.counterValue())) // neverGet should be called Connections times + .runToFuture + } } class DirectRestApiTest extends RestApiTestScenarios { diff --git a/rest/src/test/scala/io/udash/rest/RestTestApi.scala b/rest/src/test/scala/io/udash/rest/RestTestApi.scala index 50409fe7c..64ad06b33 100644 --- a/rest/src/test/scala/io/udash/rest/RestTestApi.scala +++ b/rest/src/test/scala/io/udash/rest/RestTestApi.scala @@ -1,14 +1,15 @@ package io.udash package rest -import com.avsystem.commons._ +import com.avsystem.commons.* import com.avsystem.commons.misc.{AbstractValueEnum, AbstractValueEnumCompanion, EnumCtx} import com.avsystem.commons.rpc.AsRawReal +import com.avsystem.commons.serialization.* import com.avsystem.commons.serialization.json.JsonStringOutput -import com.avsystem.commons.serialization.{GenCodec, HasPolyGenCodec, flatten, name, whenAbsent} -import io.udash.rest.openapi.adjusters._ -import io.udash.rest.openapi.{Header => OASHeader, _} -import io.udash.rest.raw._ +import io.udash.rest.openapi.adjusters.* +import io.udash.rest.openapi.{Header as OASHeader, *} +import io.udash.rest.raw.* +import monix.execution.atomic.{Atomic, AtomicInt} import monix.execution.{FutureUtils, Scheduler} import scala.concurrent.Future @@ -94,6 +95,7 @@ case class ErrorWrapper[T](error: T) object ErrorWrapper extends HasPolyGenCodec[ErrorWrapper] trait RestTestApi { + @GET @group("TrivialGroup") def trivialGet: Future[Unit] @GET @group("TrivialDescribedGroup") @tagDescription("something") def failingGet: Future[Unit] @GET def jsonFailingGet: Future[Unit] @@ -176,12 +178,15 @@ object RestTestApi extends DefaultRestApiCompanion[RestTestApi] { import Scheduler.Implicits.global - val Impl: RestTestApi = new RestTestApi { + final class Impl extends RestTestApi { def trivialGet: Future[Unit] = Future.unit def failingGet: Future[Unit] = Future.failed(HttpErrorException.plain(503, "nie")) def jsonFailingGet: Future[Unit] = Future.failed(HttpErrorException(503, HttpBody.json(JsonValue(JsonStringOutput.write(ErrorWrapper("nie")))))) def moreFailingGet: Future[Unit] = throw HttpErrorException.plain(503, "nie") - def neverGet: Future[Unit] = Future.never + def neverGet: Future[Unit] = { + counter.increment() + Future.never + } def wait(millis: Int): Future[String] = FutureUtils.delayedResult(millis.millis)(s"waited $millis ms") def getEntity(id: RestEntityId): Future[RestEntity] = Future.successful(RestEntity(id, s"${id.value}-name")) def complexGet(p1: Int, p2: String, h1: Int, h2: String, q1: Int, q2: String, q3: Opt[Int], c1: Int, c2: String): Future[RestEntity] = @@ -203,6 +208,11 @@ object RestTestApi extends DefaultRestApiCompanion[RestTestApi] { def wrappedBinaryEcho(bytes: Bytes): Future[Bytes] = Future.successful(bytes) def wrappedBody(id: RestEntityId): Future[RestEntityId] = Future.successful(id) def thirdPartyBody(dur: HasThirdParty): Future[HasThirdParty] = Future.successful(dur) + + /** Counter for neverGet calls */ + private val counter: AtomicInt = Atomic(0) + def counterValue(): Int = counter.get() + def resetCounter(): Unit = counter.set(0) } }