Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ package rest

import org.eclipse.jetty.ee8.servlet.{ServletContextHandler, ServletHolder}
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.ee8.servlet.{ServletHandler, ServletHolder}

import scala.concurrent.duration._
import scala.concurrent.duration.*

abstract class ServletBasedRestApiTest extends RestApiTest with UsesHttpServer {
override implicit def patienceConfig: PatienceConfig = PatienceConfig(10.seconds)
override implicit val patienceConfig: PatienceConfig = PatienceConfig(10.seconds)

def maxPayloadSize: Int = 1024 * 1024
def serverTimeout: FiniteDuration = 10.seconds
Expand Down
36 changes: 25 additions & 11 deletions rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,22 @@ package rest

import io.udash.rest.raw.HttpErrorException
import io.udash.rest.raw.RawRest.HandleRequest
import sttp.client3.SttpBackend
import sttp.client3.{HttpClientFutureBackend, SttpBackend}

import scala.concurrent.duration._
import java.net.http.HttpClient
import java.time.Duration as JDuration
import scala.concurrent.duration.*
import scala.concurrent.{Await, Future}

trait SttpClientRestTest extends ServletBasedRestApiTest {
implicit val backend: SttpBackend[Future, Any] = SttpRestClient.defaultBackend()
implicit val backend: SttpBackend[Future, Any] = HttpClientFutureBackend.usingClient(
//like defaultHttpClient but with connection timeout >> CallTimeout
HttpClient
.newBuilder()
.connectTimeout(JDuration.ofMillis(IdleTimout.toMillis))
.followRedirects(HttpClient.Redirect.NEVER)
.build()
)

def clientHandle: HandleRequest =
SttpRestClient.asHandleRequest[Future](s"$baseUrl/api")
Expand All @@ -21,22 +30,27 @@ trait SttpClientRestTest extends ServletBasedRestApiTest {
}

class SttpRestCallTest extends SttpClientRestTest with RestApiTestScenarios {
test("too large binary request") {
val future = proxy.binaryEcho(Array.fill[Byte](maxPayloadSize + 1)(5))
val exception = future.failed.futureValue
assert(exception == HttpErrorException.plain(413, "Payload is larger than maximum 1048576 bytes (1048577)"))
"too large binary request" in {
proxy.binaryEcho(Array.fill[Byte](maxPayloadSize + 1)(5))
.failed
.map { exception =>
assert(exception == HttpErrorException.plain(413, "Payload is larger than maximum 1048576 bytes (1048577)"))
}
}
}

class ServletTimeoutTest extends SttpClientRestTest {
override def serverTimeout: FiniteDuration = 500.millis

test("rest method timeout") {
val exception = proxy.neverGet.failed.futureValue
assert(exception == HttpErrorException.plain(500, "server operation timed out after 500 milliseconds"))
"rest method timeout" in {
proxy.neverGet
.failed
.map { exception =>
assert(exception == HttpErrorException.plain(500, "server operation timed out after 500 milliseconds"))
}
}

test("subsequent requests with timeout") {
"subsequent requests with timeout" in {
assertThrows[HttpErrorException](Await.result(proxy.wait(600), Duration.Inf))
assertThrows[HttpErrorException](Await.result(proxy.wait(600), Duration.Inf))
assertThrows[HttpErrorException](Await.result(proxy.wait(600), Duration.Inf))
Expand Down
105 changes: 54 additions & 51 deletions rest/jetty/src/main/scala/io/udash/rest/jetty/JettyRestClient.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package io.udash
package rest.jetty

import com.avsystem.commons._
import com.avsystem.commons.*
import com.avsystem.commons.annotation.explicitGenerics
import io.udash.rest.raw._
import io.udash.rest.raw.*
import io.udash.utils.URLEncoder
import monix.eval.Task
import org.eclipse.jetty.client.{BufferingResponseListener, BytesRequestContent, HttpClient, Result, StringRequestContent}
import monix.execution.Callback
import org.eclipse.jetty.client.*
import org.eclipse.jetty.http.{HttpCookie, HttpHeader, MimeTypes}

import java.nio.charset.Charset
import scala.concurrent.duration._
import scala.util.{Failure, Success}
import scala.concurrent.CancellationException
import scala.concurrent.duration.*

object JettyRestClient {
final val DefaultMaxResponseLength = 2 * 1024 * 1024
Expand All @@ -31,55 +32,57 @@ object JettyRestClient {
maxResponseLength: Int = DefaultMaxResponseLength,
timeout: Duration = DefaultTimeout
): RawRest.HandleRequest =
request => Task.async { callback =>
val path = baseUrl + PlainValue.encodePath(request.parameters.path)
val httpReq = client.newRequest(baseUrl).method(request.method.name)
request => Task(client.newRequest(baseUrl).method(request.method.name)).flatMap { httpReq =>
Task.async { (callback: Callback[Throwable, RestResponse]) =>
val path = baseUrl + PlainValue.encodePath(request.parameters.path)

httpReq.path(path)
request.parameters.query.entries.foreach {
case (name, PlainValue(value)) => httpReq.param(name, value)
}
request.parameters.headers.entries.foreach {
case (name, PlainValue(value)) => httpReq.headers(headers => headers.add(name, value))
}
request.parameters.cookies.entries.foreach {
case (name, PlainValue(value)) => httpReq.cookie(HttpCookie.build(
URLEncoder.encode(name, spaceAsPlus = true), URLEncoder.encode(value, spaceAsPlus = true)).build())
}

request.body match {
case HttpBody.Empty =>
case tb: HttpBody.Textual =>
httpReq.body(new StringRequestContent(tb.contentType, tb.content, Charset.forName(tb.charset)))
case bb: HttpBody.Binary =>
httpReq.body(new BytesRequestContent(bb.contentType, bb.bytes))
}
httpReq.path(path)
request.parameters.query.entries.foreach {
case (name, PlainValue(value)) => httpReq.param(name, value)
}
request.parameters.headers.entries.foreach {
case (name, PlainValue(value)) => httpReq.headers(headers => headers.add(name, value))
}
request.parameters.cookies.entries.foreach {
case (name, PlainValue(value)) => httpReq.cookie(HttpCookie.build(
URLEncoder.encode(name, spaceAsPlus = true), URLEncoder.encode(value, spaceAsPlus = true)).build())
}

timeout match {
case fd: FiniteDuration => httpReq.timeout(fd.length, fd.unit)
case _ =>
}
request.body match {
case HttpBody.Empty =>
case tb: HttpBody.Textual =>
httpReq.body(new StringRequestContent(tb.contentType, tb.content, Charset.forName(tb.charset)))
case bb: HttpBody.Binary =>
httpReq.body(new BytesRequestContent(bb.contentType, bb.bytes))
}

httpReq.send(new BufferingResponseListener(maxResponseLength) {
override def onComplete(result: Result): Unit =
if (result.isSucceeded) {
val httpResp = result.getResponse
val contentTypeOpt = httpResp.getHeaders.get(HttpHeader.CONTENT_TYPE).opt
val charsetOpt = contentTypeOpt.map(MimeTypes.getCharsetFromContentType)
val body = (contentTypeOpt, charsetOpt) match {
case (Opt(contentType), Opt(charset)) =>
HttpBody.textual(getContentAsString, MimeTypes.getContentTypeWithoutCharset(contentType), charset)
case (Opt(contentType), Opt.Empty) =>
HttpBody.binary(getContent, contentType)
case _ =>
HttpBody.Empty
}
val headers = httpResp.getHeaders.asScala.iterator.map(h => (h.getName, PlainValue(h.getValue))).toList
val response = RestResponse(httpResp.getStatus, IMapping(headers), body)
callback(Success(response))
} else {
callback(Failure(result.getFailure))
timeout match {
case fd: FiniteDuration => httpReq.timeout(fd.length, fd.unit)
case _ =>
}
})

httpReq.send(new BufferingResponseListener(maxResponseLength) {
override def onComplete(result: Result): Unit =
if (result.isSucceeded) {
val httpResp = result.getResponse
val contentTypeOpt = httpResp.getHeaders.get(HttpHeader.CONTENT_TYPE).opt
val charsetOpt = contentTypeOpt.map(MimeTypes.getCharsetFromContentType)
val body = (contentTypeOpt, charsetOpt) match {
case (Opt(contentType), Opt(charset)) =>
HttpBody.textual(getContentAsString, MimeTypes.getContentTypeWithoutCharset(contentType), charset)
case (Opt(contentType), Opt.Empty) =>
HttpBody.binary(getContent, contentType)
case _ =>
HttpBody.Empty
}
val headers = httpResp.getHeaders.asScala.iterator.map(h => (h.getName, PlainValue(h.getValue))).toList
val response = RestResponse(httpResp.getStatus, IMapping(headers), body)
callback(Success(response))
} else {
callback(Failure(result.getFailure))
}
})
}
.doOnCancel(Task(httpReq.abort(new CancellationException("Request cancelled"))))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import io.udash.rest.{RestApiTestScenarios, ServletBasedRestApiTest}
import org.eclipse.jetty.client.HttpClient

final class JettyRestCallTest extends ServletBasedRestApiTest with RestApiTestScenarios {
val client: HttpClient = new HttpClient
val client: HttpClient = new HttpClient() {
setMaxConnectionsPerDestination(MaxConnections)
setIdleTimeout(IdleTimout.toMillis)
}

def clientHandle: HandleRequest =
JettyRestClient.asHandleRequest(client, s"$baseUrl/api", maxPayloadSize)
Expand Down
86 changes: 62 additions & 24 deletions rest/src/test/scala/io/udash/rest/RestApiTest.scala
Original file line number Diff line number Diff line change
@@ -1,30 +1,47 @@
package io.udash
package rest

import com.avsystem.commons._
import cats.implicits.catsSyntaxTuple2Semigroupal
import com.avsystem.commons.*
import com.avsystem.commons.misc.ScalaDurationExtensions.durationIntOps
import io.udash.rest.raw.RawRest
import io.udash.rest.raw.RawRest.HandleRequest
import io.udash.testing.AsyncUdashSharedTest
import monix.eval.Task
import monix.execution.Scheduler
import org.scalactic.source.Position
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.time.{Millis, Seconds, Span}
import org.scalatest.{Assertion, BeforeAndAfterEach}

abstract class RestApiTest extends AnyFunSuite with ScalaFutures {
import scala.concurrent.duration.FiniteDuration

abstract class RestApiTest extends AsyncUdashSharedTest with BeforeAndAfterEach {
implicit def scheduler: Scheduler = Scheduler.global

protected final val MaxConnections: Int = 1 // to timeout quickly
protected final val Connections: Int = 10 // > MaxConnections
protected final val CallTimeout: FiniteDuration = 300.millis // << idle timeout
protected final val IdleTimout: FiniteDuration = CallTimeout * 100

protected val impl: RestTestApi.Impl = new RestTestApi.Impl

override protected def beforeEach(): Unit = {
super.beforeEach()
impl.resetCounter()
}

final val serverHandle: RawRest.HandleRequest =
RawRest.asHandleRequest[RestTestApi](RestTestApi.Impl)
RawRest.asHandleRequest[RestTestApi](impl)

def clientHandle: RawRest.HandleRequest

lazy val proxy: RestTestApi =
RawRest.fromHandleRequest[RestTestApi](clientHandle)

def testCall[T](call: RestTestApi => Future[T])(implicit pos: Position): Unit =
assert(
call(proxy).wrapToTry.futureValue.map(mkDeep) ==
call(RestTestApi.Impl).catchFailures.wrapToTry.futureValue.map(mkDeep)
)
def testCall[T](call: RestTestApi => Future[T])(implicit pos: Position): Future[Assertion] =
(call(proxy).wrapToTry, call(impl).catchFailures.wrapToTry).mapN { (proxyResult, implResult) =>
assert(proxyResult.map(mkDeep) == implResult.map(mkDeep))
}

def mkDeep(value: Any): Any = value match {
case arr: Array[_] => IArraySeq.empty[AnyRef] ++ arr.iterator.map(mkDeep)
Expand All @@ -33,62 +50,83 @@ abstract class RestApiTest extends AnyFunSuite with ScalaFutures {
}

trait RestApiTestScenarios extends RestApiTest {
test("trivial GET") {
override implicit val patienceConfig: PatienceConfig = PatienceConfig(scaled(Span(10, Seconds)), scaled(Span(50, Millis)))

"trivial GET" in {
testCall(_.trivialGet)
}

test("failing GET") {
"failing GET" in {
testCall(_.failingGet)
}

test("JSON failing GET") {
"JSON failing GET" in {
testCall(_.jsonFailingGet)
}

test("more failing GET") {
"more failing GET" in {
testCall(_.moreFailingGet)
}

test("complex GET") {
"complex GET" in {
testCall(_.complexGet(0, "a/ +&", 1, "b/ +&", 2, "ć/ +&", Opt(3), 4, "ó /&f"))
testCall(_.complexGet(0, "a/ +&", 1, "b/ +&", 2, "ć/ +&", Opt.Empty, 3, "ó /&f"))
}

test("multi-param body POST") {
"multi-param body POST" in {
testCall(_.multiParamPost(0, "a/ +&", 1, "b/ +&", 2, "ć/ +&", 3, "l\"l"))
}

test("single body PUT") {
"single body PUT" in {
testCall(_.singleBodyPut(RestEntity(RestEntityId("id"), "señor")))
}

test("form POST") {
"form POST" in {
testCall(_.formPost("ó", "ą=ę", 42))
}

test("prefixed GET") {
"prefixed GET" in {
testCall(_.prefix("p0", "h0", "q0").subget(0, 1, 2))
}

test("transparent prefix GET") {
"transparent prefix GET" in {
testCall(_.transparentPrefix.subget(0, 1, 2))
}

test("custom response with headers") {
"custom response with headers" in {
testCall(_.customResponse("walue"))
}

test("binary request and response") {
"binary request and response" in {
testCall(_.binaryEcho(Array.fill[Byte](5)(5)))
}

test("large binary request and response") {
"large binary request and response" in {
testCall(_.binaryEcho(Array.fill[Byte](1024 * 1024)(5)))
}

test("body using third party type") {
"body using third party type" in {
testCall(_.thirdPartyBody(HasThirdParty(ThirdParty(5))))
}

"close connection on monix task timeout" in {
Task
.traverse(List.range(0, Connections))(_ => Task.deferFuture(proxy.neverGet).timeout(CallTimeout).failed)
.map(_ => assertResult(expected = Connections)(actual = impl.counterValue())) // neverGet should be called Connections times
.runToFuture
}

"close connection on monix task cancellation" in {
Task
.traverse(List.range(0, Connections)) { i =>
val cancelable = Task.deferFuture(proxy.neverGet).runAsync(_ => ())
Task.sleep(100.millis)
.restartUntil(_ => impl.counterValue() >= i)
.map(_ => cancelable.cancel())
}
.map(_ => assertResult(expected = Connections)(actual = impl.counterValue())) // neverGet should be called Connections times
.runToFuture
}
}

class DirectRestApiTest extends RestApiTestScenarios {
Expand Down
Loading