Skip to content

Commit 77fe745

Browse files
committed
Customizable response batch size
1 parent ccfc100 commit 77fe745

File tree

7 files changed

+45
-28
lines changed

7 files changed

+45
-28
lines changed

rest/.jvm/src/main/scala/io/udash/rest/RestServlet.scala

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ object RestServlet {
2222
final val DefaultHandleTimeout = 30.seconds
2323
final val DefaultMaxPayloadSize = 16 * 1024 * 1024L // 16MB
2424
final val CookieHeader = "Cookie"
25+
final val DefaultStreamingBatchSize = 1
2526
private final val BufferSize = 8192
2627

2728
/**
@@ -31,21 +32,29 @@ object RestServlet {
3132
* @param handleTimeout maximum time the servlet will wait for results returned by REST API implementation
3233
* @param maxPayloadSize maximum acceptable incoming payload size, in bytes;
3334
* if exceeded, `413 Payload Too Large` response will be sent back
35+
* @param defaultStreamingBatchSize default batch size for [[StreamedRestResponse]]
3436
*/
3537
@explicitGenerics def apply[RestApi: RawRest.AsRawRpc : RestMetadata](
3638
apiImpl: RestApi,
3739
handleTimeout: FiniteDuration = DefaultHandleTimeout,
3840
maxPayloadSize: Long = DefaultMaxPayloadSize,
41+
defaultStreamingBatchSize: Int = DefaultStreamingBatchSize,
3942
)(implicit
4043
scheduler: Scheduler
4144
): RestServlet =
42-
new RestServlet(RawRest.asHandleRequestWithStreaming[RestApi](apiImpl), handleTimeout, maxPayloadSize)
45+
new RestServlet(
46+
handleRequest = RawRest.asHandleRequestWithStreaming[RestApi](apiImpl),
47+
handleTimeout = handleTimeout,
48+
maxPayloadSize = maxPayloadSize,
49+
defaultStreamingBatchSize = defaultStreamingBatchSize,
50+
)
4351
}
4452

4553
class RestServlet(
4654
handleRequest: RawRest.HandleRequestWithStreaming,
4755
handleTimeout: FiniteDuration = DefaultHandleTimeout,
4856
maxPayloadSize: Long = DefaultMaxPayloadSize,
57+
defaultStreamingBatchSize: Int = DefaultStreamingBatchSize,
4958
)(implicit
5059
scheduler: Scheduler
5160
) extends HttpServlet with LazyLogging {
@@ -120,14 +129,16 @@ class RestServlet(
120129
Task.eval(writeNonEmptyBody(response, single.body))
121130
case binary: StreamedBody.RawBinary =>
122131
response.setContentType(binary.contentType)
123-
binary.content.bufferTumbling(stream.batchSize).consumeWith(Consumer.foreach { batch =>
124-
batch.foreach(e => response.getOutputStream.write(e))
125-
response.getOutputStream.flush()
126-
})
132+
binary.content
133+
.bufferTumbling(stream.customBatchSize.getOrElse(defaultStreamingBatchSize))
134+
.consumeWith(Consumer.foreach { batch =>
135+
batch.foreach(e => response.getOutputStream.write(e))
136+
response.getOutputStream.flush()
137+
})
127138
case jsonList: StreamedBody.JsonList =>
128139
response.setContentType(jsonList.contentType)
129140
jsonList.elements
130-
.bufferTumbling(stream.batchSize)
141+
.bufferTumbling(stream.customBatchSize.getOrElse(defaultStreamingBatchSize))
131142
.switchIfEmpty(Observable(Seq.empty))
132143
.zipWithIndex
133144
.consumeWith(Consumer.foreach { case (batch, idx) =>

rest/jetty/src/main/scala/io/udash/rest/jetty/JettyRestClient.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ import io.udash.rest.util.Utils
99
import io.udash.utils.URLEncoder
1010
import monix.eval.Task
1111
import monix.execution.{Callback, Scheduler}
12+
import monix.reactive.Observable
1213
import monix.reactive.OverflowStrategy.Unbounded
13-
import monix.reactive.{MulticastStrategy, Observable}
1414
import monix.reactive.subjects.{ConcurrentSubject, PublishToOneSubject}
1515
import org.eclipse.jetty.client.*
1616
import org.eclipse.jetty.http.{HttpCookie, HttpHeader, MimeTypes}
@@ -31,7 +31,8 @@ import scala.concurrent.duration.*
3131
* @param client The configured Jetty `HttpClient` instance.
3232
* @param defaultMaxResponseLength Default maximum size (in bytes) for buffering non-streamed responses.
3333
* @param defaultTimeout Default timeout for requests.
34-
*/final class JettyRestClient(
34+
*/
35+
final class JettyRestClient(
3536
client: HttpClient,
3637
defaultMaxResponseLength: Int = JettyRestClient.DefaultMaxResponseLength,
3738
defaultTimeout: Duration = JettyRestClient.DefaultTimeout,
@@ -115,7 +116,7 @@ import scala.concurrent.duration.*
115116
code = response.getStatus,
116117
headers = parseHeaders(response),
117118
body = body,
118-
batchSize = 1,
119+
customBatchSize = Opt.Empty,
119120
)
120121
callback(Success(restResponse))
121122
}
@@ -149,7 +150,7 @@ import scala.concurrent.duration.*
149150
code = httpResp.getStatus,
150151
headers = parseHeaders(httpResp),
151152
body = StreamedBody.fromHttpBody(parseHttpBody(httpResp, this)),
152-
batchSize = 1,
153+
customBatchSize = Opt.Empty,
153154
)
154155
callback(Success(restResponse))
155156
} else {
@@ -165,8 +166,8 @@ import scala.concurrent.duration.*
165166
}
166167

167168
/**
168-
* Creates a `RawRest.HandleRequest` which handles standard REST requests by buffering the entire response.
169-
* This does *not* support streaming responses.
169+
* Creates a [[RawRest.HandleRequest]] which handles standard REST requests by buffering the entire response.
170+
* This does <b>not</b> support streaming responses.
170171
*
171172
* @param baseUrl The base URL for the REST service.
172173
* @param customMaxResponseLength Optional override for the maximum response length.

rest/src/main/scala/io/udash/rest/annotations.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package io.udash
22
package rest
33

4+
import com.avsystem.commons.Opt
45
import com.avsystem.commons.annotation.{AnnotationAggregate, defaultsToName}
56
import com.avsystem.commons.meta.RealSymAnnotation
6-
import com.avsystem.commons.rpc._
7+
import com.avsystem.commons.rpc.*
78
import com.avsystem.commons.serialization.optionalParam
8-
import io.udash.rest.raw._
9+
import io.udash.rest.raw.*
910

1011
import scala.annotation.StaticAnnotation
1112

@@ -362,6 +363,11 @@ class addRequestHeader(name: String, value: String) extends RequestAdjuster {
362363
* HTTP header to all outgoing responses generated for invocations of that method on the server side.
363364
*/
364365
class addResponseHeader(name: String, value: String) extends ResponseAdjuster with StreamedResponseAdjuster {
365-
def adjustResponse(response: RestResponse): RestResponse = response.header(name, value)
366+
override def adjustResponse(response: RestResponse): RestResponse = response.header(name, value)
366367
override def adjustResponse(response: StreamedRestResponse): StreamedRestResponse = response.header(name, value)
367368
}
369+
370+
class streamingResponseBatchSize(size: Int) extends StreamedResponseAdjuster {
371+
override def adjustResponse(response: StreamedRestResponse): StreamedRestResponse =
372+
response.copy(customBatchSize = Opt.some(size))
373+
}

rest/src/main/scala/io/udash/rest/raw/RawRest.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,9 @@ trait RawRest {
142142
RawRest.resolveAndHandle(metadata)(handleResolvedWithStreaming)
143143

144144
/**
145-
* Handles a resolved REST call and returns a standard RestResponse.
145+
* Handles a resolved REST call and returns a standard [[RestResponse]].
146146
* This method is maintained for backward compatibility with non-streaming clients.
147-
* It delegates to handleResolvedWithStreaming and converts any streaming response to a standard response.
147+
* It delegates to [[handleResolvedWithStreaming]] and converts any streaming response to a standard response.
148148
*/
149149
def handleResolved(request: RestRequest, resolved: ResolvedCall): Task[RestResponse] =
150150
StreamedRestResponse.fallbackToRestResponse(handleResolvedWithStreaming(request, resolved))
@@ -254,7 +254,7 @@ object RawRest extends RawRpcCompanion[RawRest] {
254254
@implicitNotFound(InvalidTraitMessage)
255255
implicit def rawRestAsRawNotFound[T]: ImplicitNotFound[AsRaw[RawRest, T]] = ImplicitNotFound()
256256

257-
// client side
257+
// client side without response streaming support
258258
def fromHandleRequest[Real: AsRealRpc : RestMetadata](handle: HandleRequest): Real =
259259
RawRest.asReal(new DefaultRawRest(Nil, RestMetadata[Real], RestParameters.Empty, new RawRest.RestRequestHandler {
260260
override def handleRequest(request: RestRequest): Task[RestResponse] = handle(request)
@@ -266,7 +266,7 @@ object RawRest extends RawRpcCompanion[RawRest] {
266266
def fromHandleRequestWithStreaming[Real: AsRealRpc : RestMetadata](handleRequest: RawRest.RestRequestHandler): Real =
267267
RawRest.asReal(new DefaultRawRest(Nil, RestMetadata[Real], RestParameters.Empty, handleRequest))
268268

269-
// server side
269+
// server side without response streaming support
270270
def asHandleRequest[Real: AsRawRpc : RestMetadata](real: Real): HandleRequest =
271271
RawRest.asRaw(real).asHandleRequest(RestMetadata[Real])
272272

rest/src/main/scala/io/udash/rest/raw/RestResponse.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -113,13 +113,13 @@ trait RestResponseLowPrio { this: RestResponse.type =>
113113

114114
/**
115115
* Streaming REST response containing a status code, headers, and a streamed body.
116-
* Unlike standard RestResponse, the body content can be delivered incrementally through a reactive stream.
116+
* Unlike standard [[RestResponse]], the body content can be delivered incrementally through a reactive stream.
117117
*/
118118
final case class StreamedRestResponse(
119119
code: Int,
120120
headers: IMapping[PlainValue],
121121
body: StreamedBody,
122-
batchSize: Int,
122+
customBatchSize: Opt[Int] = Opt.Empty,
123123
) extends AbstractRestResponse {
124124

125125
def header(name: String, value: String): StreamedRestResponse =
@@ -132,7 +132,7 @@ final case class StreamedRestResponse(
132132
object StreamedRestResponse extends StreamedRestResponseLowPrio {
133133

134134
/**
135-
* Converts a StreamedRestResponse to a standard RestResponse by materializing streamed content.
135+
* Converts a [[StreamedRestResponse]] to a standard [[RestResponse]] by materializing streamed content.
136136
* This is useful for compatibility with APIs that don't support streaming.
137137
*/
138138
def fallbackToRestResponse(response: StreamedRestResponse): Task[RestResponse] = {
@@ -158,7 +158,7 @@ object StreamedRestResponse extends StreamedRestResponseLowPrio {
158158
}
159159

160160
/**
161-
* Converts any AbstractRestResponse to a standard RestResponse by materializing streamed content if necessary.
161+
* Converts any [[AbstractRestResponse]] to a standard [[RestResponse]] by materializing streamed content if necessary.
162162
* This is useful for compatibility with APIs that don't support streaming.
163163
*/
164164
def fallbackToRestResponse(response: Task[AbstractRestResponse]): Task[RestResponse] =
@@ -168,7 +168,7 @@ object StreamedRestResponse extends StreamedRestResponseLowPrio {
168168
}
169169

170170
def fromHttpError(error: HttpErrorException): StreamedRestResponse =
171-
StreamedRestResponse(error.code, IMapping.empty, StreamedBody.fromHttpBody(error.payload), 1)
171+
StreamedRestResponse(error.code, IMapping.empty, StreamedBody.fromHttpBody(error.payload))
172172

173173
class LazyOps(private val resp: () => StreamedRestResponse) extends AnyVal {
174174
def recoverHttpError: StreamedRestResponse = try resp() catch {

rest/src/main/scala/io/udash/rest/raw/StreamedBody.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ sealed trait StreamedBody {
2121
code = defaultStatus,
2222
headers = IMapping.empty,
2323
body = this,
24-
batchSize = 1,
2524
)
2625
}
2726
object StreamedBody extends StreamedBodyLowPrio {

rest/src/test/scala/io/udash/rest/raw/RawRestTest.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ trait RootApi {
8484
@POST @CustomBody def echoHeaders(headers: Map[String, String]): Future[WithHeaders[Unit]]
8585
}
8686
object RootApi extends DefaultRestApiCompanion[RootApi]
87+
8788
class RawRestTest extends AnyFunSuite with ScalaFutures with Matchers {
8889
implicit def scheduler: Scheduler = Scheduler.global
8990

@@ -206,7 +207,7 @@ class RawRestTest extends AnyFunSuite with ScalaFutures with Matchers {
206207
serverHandleWithStreaming(request).flatMap {
207208
case stream: StreamedRestResponse => Task.now(stream)
208209
case resp: RestResponse =>
209-
Task(StreamedRestResponse(resp.code, resp.headers, StreamedBody.fromHttpBody(resp.body), 1))
210+
Task(StreamedRestResponse(resp.code, resp.headers, StreamedBody.fromHttpBody(resp.body)))
210211
}
211212
})
212213

@@ -505,13 +506,12 @@ class RawRestTest extends AnyFunSuite with ScalaFutures with Matchers {
505506
HttpBody.Empty
506507
)
507508
whenReady(serverHandleWithStreaming(request).runToFuture) {
508-
case StreamedRestResponse(code, headers, body, batchSize) => {
509+
case StreamedRestResponse(code, headers, body, batchSize) =>
509510
code shouldBe 200
510511
val elements = castOrFail[StreamedBody.JsonList](body).elements
511512
whenReady(elements.toListL.runToFuture) { e =>
512513
e shouldBe List(JsonValue("1"), JsonValue("2"), JsonValue("3"), JsonValue("4"))
513514
}
514-
}
515515
case _ => fail()
516516
}
517517
}

0 commit comments

Comments
 (0)