Skip to content

Commit f31ac3b

Browse files
committed
Remove on-the-fly zip
1 parent df5eb3c commit f31ac3b

File tree

3 files changed

+16
-23
lines changed

3 files changed

+16
-23
lines changed

src/main/resources/application.conf

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@ pekko {
1616
http.server.parsing.max-content-length = 104857600
1717
http.client.parsing.max-content-length = 104857600
1818
http.parsing.max-to-strict-bytes = 104857600
19-
# For large files raise request timeout (default: 20) using directives, eg withRequestTimeout
2019
# https://pekko.apache.org/docs/pekko-http/current///common/timeouts.html#server-timeouts
21-
# This seems NOT be honored anymore
22-
# http.server.request-timeout = 60 seconds
20+
http.server.request-timeout = 180 seconds
2321

2422
# Keep the connection alive in WebsocketEcho
2523
# https://pekko.apache.org/docs/pekko-http/current/server-side/websocket-support.html
@@ -31,7 +29,9 @@ pekko {
3129
http.host-connection-pool {
3230
max-connections = 10
3331
max-retries = 5
32+
max-open-requests = 64
3433
}
34+
3535
}
3636

3737
# Custom dispatcher used to show blocking behaviour

src/main/scala/akkahttp/HttpFileEcho.scala

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import org.apache.pekko.http.scaladsl.server.Route
1010
import org.apache.pekko.http.scaladsl.server.directives.FileInfo
1111
import org.apache.pekko.http.scaladsl.unmarshalling.Unmarshal
1212
import org.apache.pekko.stream.RestartSettings
13-
import org.apache.pekko.stream.scaladsl.{Compression, FileIO, RestartSource, Sink, Source}
13+
import org.apache.pekko.stream.scaladsl.{FileIO, RestartSource, Sink, Source}
1414
import spray.json.{DefaultJsonProtocol, RootJsonFormat}
1515

1616
import java.io.File
@@ -45,9 +45,7 @@ trait JsonProtocol extends DefaultJsonProtocol with SprayJsonSupport {
4545
* Added:
4646
* - Retry on upload/download
4747
* Doc: https://blog.colinbreck.com/backoff-and-retry-error-handling-for-akka-streams
48-
* - On the fly gzip compression on upload and gunzip decompression on download
49-
* Doc: https://doc.akka.io/docs/akka/current/stream/stream-cookbook.html#dealing-with-compressed-data-streams
50-
* - Browser client for manual upload of uncompressed files
48+
* - Browser client for manual upload
5149
*
5250
* To prove that the streaming works:
5351
* - Replace testfile.jpg with a large file, eg 63MB.pdf
@@ -65,7 +63,7 @@ object HttpFileEcho extends App with JsonProtocol {
6563
val chuckSizeBytes = 100 * 1024 // to handle large files
6664

6765
server(address, port)
68-
(1 to 5).par.foreach(each => roundtripClient(each, address, port))
66+
(1 to 50).par.foreach(each => roundtripClient(each, address, port))
6967
browserClient()
7068

7169
def server(address: String, port: Int): Unit = {
@@ -81,8 +79,6 @@ object HttpFileEcho extends App with JsonProtocol {
8179

8280
def routes: Route = logRequestResult("fileecho") {
8381
path("upload") {
84-
withRequestTimeout(60.seconds) {
85-
8682
formFields(Symbol("payload")) { payload =>
8783
println(s"Server received request with additional form data: $payload")
8884

@@ -97,7 +93,6 @@ object HttpFileEcho extends App with JsonProtocol {
9793
complete(Future(FileHandle(uploadedFile.getName, uploadedFile.getAbsolutePath, uploadedFile.length())))
9894
}
9995
}
100-
}
10196
} ~
10297
path("download") {
10398
get {
@@ -155,15 +150,15 @@ object HttpFileEcho extends App with JsonProtocol {
155150
def createEntityFrom(file: File): Future[RequestEntity] = {
156151
require(file.exists())
157152

158-
val compressedFileSource = FileIO.fromPath(file.toPath, chuckSizeBytes).via(Compression.gzip)
153+
val fileSource = FileIO.fromPath(file.toPath, chuckSizeBytes)
159154
val formData = Multipart.FormData(Multipart.FormData.BodyPart(
160155
"binary",
161-
HttpEntity(MediaTypes.`application/octet-stream`, file.length(), compressedFileSource),
156+
HttpEntity(MediaTypes.`application/octet-stream`, file.length(), fileSource),
162157
// Set the Content-Disposition header
163158
// see: https://www.w3.org/Protocols/HTTP/Issues/content-disposition.txt
164159
Map("filename" -> file.getName)),
165160
// Pass additional (json) payload in a form field
166-
Multipart.FormData.BodyPart.Strict("payload", "{\"payload\": \"sent from Scala client\"}", Map.empty)
161+
Multipart.FormData.BodyPart.Strict("payload", s"{\"payload\": \"sent from Scala client with id: $id\"}", Map.empty)
167162
)
168163

169164
Marshal(formData).to[RequestEntity]
@@ -235,7 +230,6 @@ object HttpFileEcho extends App with JsonProtocol {
235230

236231
def saveResponseToFile(response: HttpResponse, localFile: File) = {
237232
response.entity.dataBytes
238-
.via(Compression.gunzip(chuckSizeBytes))
239233
.runWith(FileIO.toPath(Paths.get(localFile.getAbsolutePath)))
240234
}
241235

@@ -247,7 +241,7 @@ object HttpFileEcho extends App with JsonProtocol {
247241
downloaded <- saveResponseToFile(response, localFile)
248242
} yield downloaded
249243

250-
val ioresult = Await.result(result, 1.minute)
244+
val ioresult = Await.result(result, 180.seconds)
251245
println(s"DownloadClient with id: $id finished downloading: ${ioresult.count} bytes to file: ${localFile.getAbsolutePath}")
252246
}
253247

src/main/scala/akkahttp/HttpFileEchoStream.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,17 @@ import scala.util.{Failure, Success}
2828
* https://doc.akka.io/docs/akka-http/current/client-side/host-level.html#using-the-host-level-api-with-a-queue
2929
* https://doc.akka.io/docs/akka-http/current/client-side/host-level.html?language=scala#retrying-a-request
3030
*
31-
*
3231
* Remarks:
3332
* - No retry on upload because POST request is non-idempotent
34-
* - Homegrown retry on download, because this does somehow not work yet via the cachedHostConnectionPool
35-
*
33+
* - Homegrown retry on download, because this does somehow not work yet via the cachedHostConnectionPool$
34+
* - Shows more robust behaviour with large files than [[akkahttp.HttpFileEcho]]
3635
*/
3736
object HttpFileEchoStream extends App with JsonProtocol {
3837
implicit val system: ActorSystem = ActorSystem()
3938

4039
import system.dispatcher
4140

42-
val resourceFileName = "testfile.jpg"
41+
val resourceFileName = "63MB.pdf"
4342
val (address, port) = ("127.0.0.1", 6000)
4443
server(address, port)
4544
roundtripClient(address, port)
@@ -76,7 +75,7 @@ object HttpFileEchoStream extends App with JsonProtocol {
7675
println(s"Server: Received download request for: ${fileHandle.fileName}")
7776

7877
// Activate to simulate rnd server ex during download
79-
throwRndRuntimeException("download")
78+
//throwRndRuntimeException("download")
8079

8180
getFromFile(new File(fileHandle.absolutePath), MediaTypes.`application/octet-stream`)
8281
}
@@ -107,8 +106,8 @@ object HttpFileEchoStream extends App with JsonProtocol {
107106
def roundtripClient(address: String, port: Int) = {
108107

109108
val filesToUpload =
110-
// Unbounded stream. Limited for testing purposes by appending eg .take(5)
111-
Source(LazyList.continually(FileHandle(resourceFileName, Paths.get(s"src/main/resources/$resourceFileName").toString, 0))).take(10)
109+
// Unbounded stream. Limited for testing purposes by appending eg .take(n)
110+
Source(LazyList.continually(FileHandle(resourceFileName, Paths.get(s"src/main/resources/$resourceFileName").toString, 0))).take(100)
112111

113112
val hostConnectionPoolUpload = Http().cachedHostConnectionPool[FileHandle](address, port)
114113

0 commit comments

Comments
 (0)