Skip to content

Commit 604708c

Browse files
committed
Batching only for Json list, more tests
1 parent 7debb1b commit 604708c

File tree

9 files changed

+26
-16
lines changed

9 files changed

+26
-16
lines changed

rest/.jvm/src/main/scala/io/udash/rest/RestServlet.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ object RestServlet {
2222
final val DefaultHandleTimeout = 30.seconds
2323
final val DefaultMaxPayloadSize = 16 * 1024 * 1024L // 16MB
2424
final val CookieHeader = "Cookie"
25-
final val DefaultStreamingBatchSize = 1
25+
final val DefaultStreamingBatchSize = 100
2626
private final val BufferSize = 8192
2727

2828
/**
@@ -32,7 +32,7 @@ object RestServlet {
3232
* @param handleTimeout maximum time the servlet will wait for results returned by REST API implementation
3333
* @param maxPayloadSize maximum acceptable incoming payload size, in bytes;
3434
* if exceeded, `413 Payload Too Large` response will be sent back
35-
* @param defaultStreamingBatchSize default batch size for [[StreamedRestResponse]]
35+
* @param defaultStreamingBatchSize default batch when streaming [[StreamedBody.JsonList]]
3636
*/
3737
@explicitGenerics def apply[RestApi: RawRest.AsRawRpc : RestMetadata](
3838
apiImpl: RestApi,
@@ -130,15 +130,14 @@ class RestServlet(
130130
case binary: StreamedBody.RawBinary =>
131131
response.setContentType(binary.contentType)
132132
binary.content
133-
.bufferTumbling(stream.customBatchSize.getOrElse(defaultStreamingBatchSize))
134-
.consumeWith(Consumer.foreach { batch =>
135-
batch.foreach(e => response.getOutputStream.write(e))
133+
.consumeWith(Consumer.foreach { chunk =>
134+
response.getOutputStream.write(chunk)
136135
response.getOutputStream.flush()
137136
})
138137
case jsonList: StreamedBody.JsonList =>
139138
response.setContentType(jsonList.contentType)
140139
jsonList.elements
141-
.bufferTumbling(stream.customBatchSize.getOrElse(defaultStreamingBatchSize))
140+
.bufferTumbling(jsonList.customBatchSize.getOrElse(defaultStreamingBatchSize))
142141
.switchIfEmpty(Observable(Seq.empty))
143142
.zipWithIndex
144143
.consumeWith(Consumer.foreach { case (batch, idx) =>

rest/.jvm/src/test/scala/io/udash/rest/ServletBasedRestApiTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ abstract class ServletBasedRestApiTest extends RestApiTest with UsesHttpServer {
1414

1515
protected def setupServer(server: Server): Unit = {
1616
val servlet = new RestServlet(serverHandle, serverTimeout, maxPayloadSize)
17-
val streamingServlet = new RestServlet(streamingServerHandle, serverTimeout, maxPayloadSize)
17+
val streamingServlet = new RestServlet(streamingServerHandle, serverTimeout, maxPayloadSize, defaultStreamingBatchSize = 1)
1818
val handler = new ServletContextHandler()
1919
handler.addServlet(new ServletHolder(servlet), "/api/*")
2020
handler.addServlet(new ServletHolder(streamingServlet), "/stream-api/*")

rest/jetty/src/main/scala/io/udash/rest/jetty/JettyRestClient.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@ final class JettyRestClient(
115115
code = response.getStatus,
116116
headers = parseHeaders(response),
117117
body = body,
118-
customBatchSize = Opt.Empty,
119118
)
120119
callback(Success(restResponse))
121120
}
@@ -151,7 +150,6 @@ final class JettyRestClient(
151150
code = httpResp.getStatus,
152151
headers = parseHeaders(httpResp),
153152
body = StreamedBody.fromHttpBody(parseHttpBody(httpResp, this)),
154-
customBatchSize = Opt.Empty,
155153
)
156154
callback(Success(restResponse))
157155
} else {

rest/src/main/scala/io/udash/rest/annotations.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,15 @@ class addResponseHeader(name: String, value: String) extends ResponseAdjuster wi
367367
override def adjustResponse(response: StreamedRestResponse): StreamedRestResponse = response.header(name, value)
368368
}
369369

370+
/**
371+
* Annotation which may be applied on REST API methods to change streaming batch size when [[StreamedBody.JsonList]]
372+
* bodies are used. It has no effect when applied to other body types or non-streaming methods.
373+
*/
370374
class streamingResponseBatchSize(size: Int) extends StreamedResponseAdjuster {
371375
override def adjustResponse(response: StreamedRestResponse): StreamedRestResponse =
372-
response.copy(customBatchSize = Opt.some(size))
376+
response.body match {
377+
case jsonList: StreamedBody.JsonList =>
378+
response.copy(body = jsonList.copy(customBatchSize = Opt.some(size)))
379+
case _ => response
380+
}
373381
}

rest/src/main/scala/io/udash/rest/raw/RestResponse.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,6 @@ final case class StreamedRestResponse(
121121
code: Int,
122122
headers: IMapping[PlainValue],
123123
body: StreamedBody,
124-
customBatchSize: Opt[Int] = Opt.Empty,
125124
) extends AbstractRestResponse {
126125

127126
def header(name: String, value: String): StreamedRestResponse =

rest/src/main/scala/io/udash/rest/raw/StreamedBody.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package io.udash
22
package rest.raw
33

44
import com.avsystem.commons.annotation.explicitGenerics
5-
import com.avsystem.commons.misc.ImplicitNotFound
5+
import com.avsystem.commons.misc.{ImplicitNotFound, Opt}
66
import com.avsystem.commons.rpc.{AsRaw, AsRawReal, AsReal}
77
import com.avsystem.commons.serialization.GenCodec.ReadFailure
88
import monix.reactive.Observable
@@ -49,6 +49,7 @@ object StreamedBody extends StreamedBodyLowPrio {
4949
final case class JsonList(
5050
elements: Observable[JsonValue],
5151
charset: String = HttpBody.Utf8Charset,
52+
customBatchSize: Opt[Int] = Opt.Empty,
5253
) extends NonEmpty {
5354
val contentType: String = s"${HttpBody.JsonType};charset=$charset"
5455

rest/src/test/scala/io/udash/rest/RestApiTest.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,14 +146,18 @@ trait RestApiTestScenarios extends RestApiTest {
146146
}
147147
}
148148

149-
// TODO streaming MORE tests: cancellation, timeouts, errors, errors after sending a few elements, custom format, slow source observable
149+
// TODO streaming MORE tests: cancellation
150150
trait StreamingRestApiTestScenarios extends RestApiTest {
151151

152152
"empty GET stream" in {
153153
testStream(_.simpleStream(0))
154154
}
155155

156-
"trivial GET stream" in {
156+
"trivial GET stream - single batch" in {
157+
testStream(_.simpleStream(1))
158+
}
159+
160+
"trivial GET stream - multi batch" in {
157161
testStream(_.simpleStream(5))
158162
}
159163

rest/src/test/scala/io/udash/rest/StreamingRestTestApi.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ object CustomStream extends GenCodecRestImplicits {
4545
}
4646

4747
trait StreamingRestTestApi {
48+
@streamingResponseBatchSize(3)
4849
@GET def simpleStream(size: Int): Observable[Int]
4950

5051
@GET def jsonStream: Observable[RestEntity]

rest/src/test/scala/io/udash/rest/raw/RawRestTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ class RawRestTest extends AnyFunSuite with ScalaFutures with Matchers {
122122

123123
val bodyReprTask: Task[String] = resp.body match {
124124
case StreamedBody.Empty => Task.now("")
125-
case StreamedBody.JsonList(elements, charset) =>
125+
case StreamedBody.JsonList(elements, charset, _) =>
126126
elements.toListL.map { list =>
127127
s" application/json;charset=$charset\n[${list.map(_.value).mkString(",")}]"
128128
}
@@ -506,7 +506,7 @@ class RawRestTest extends AnyFunSuite with ScalaFutures with Matchers {
506506
HttpBody.Empty
507507
)
508508
whenReady(serverHandleWithStreaming(request).runToFuture) {
509-
case StreamedRestResponse(code, headers, body, batchSize) =>
509+
case StreamedRestResponse(code, headers, body) =>
510510
code shouldBe 200
511511
val elements = castOrFail[StreamedBody.JsonList](body).elements
512512
whenReady(elements.toListL.runToFuture) { e =>

0 commit comments

Comments
 (0)