@@ -16,6 +16,7 @@ import org.apache.pekko.pattern.{CircuitBreaker, CircuitBreakerOpenException}
16
16
import org .apache .pekko .stream .ThrottleMode
17
17
import org .apache .pekko .stream .scaladsl .{Flow , Sink , Source }
18
18
import org .apache .pekko .util .ByteString
19
+ import org .bouncycastle .util .encoders .Hex
19
20
import org .slf4j .{Logger , LoggerFactory }
20
21
21
22
import java .security .MessageDigest
@@ -172,12 +173,19 @@ object ReverseProxy extends App {
172
173
uri
173
174
}
174
175
175
- def computeHashWithPayloadAndPayloadLength : Flow [ByteString , (MessageDigest , ByteString , Int ), NotUsed ] =
176
- Flow [ByteString ].fold((MessageDigest .getInstance(" SHA-256" ), ByteString .empty, 0 )) { (acc, chunk) =>
177
- acc._1.update(chunk.toByteBuffer)
178
- (acc._1, acc._2 ++ chunk, acc._3 + chunk.length)
176
+ case class HashAccumulator (digest : MessageDigest , payload : ByteString , length : Int )
177
+
178
+ def computeHashFromPayloadAndPayloadLength : Flow [ByteString , HashAccumulator , NotUsed ] =
179
+ Flow [ByteString ].fold(HashAccumulator (
180
+ MessageDigest .getInstance(" SHA-256" ),
181
+ ByteString .empty,
182
+ 0 )) { (acc, chunk) =>
183
+ val bytes = chunk.toArray
184
+ acc.digest.update(bytes, 0 , bytes.length)
185
+ HashAccumulator (acc.digest, acc.payload ++ chunk, acc.length + chunk.length)
179
186
}
180
187
188
+
181
189
services.get(mode) match {
182
190
case Some (rawSeq) =>
183
191
val seq = rawSeq.flatMap(t => (1 to t.weight).map(_ => t))
@@ -196,12 +204,13 @@ object ReverseProxy extends App {
196
204
197
205
// Example of an on-the-fly processing scenario
198
206
val hashFuture = request.entity.dataBytes
199
- .via(computeHashWithPayloadAndPayloadLength )
207
+ .via(computeHashFromPayloadAndPayloadLength )
200
208
.runWith(Sink .head)
201
- .map { case (digest, _, _) =>
202
- RawHeader (" X-Content-Hash" , digest.digest().map( " %02x " .format(_)).mkString )
209
+ .map { accumulator =>
210
+ RawHeader (" X-Content-Hash" , Hex .toHexString(accumulator.digest.digest()) )
203
211
}
204
212
213
+
205
214
hashFuture.flatMap { hashHeader =>
206
215
val proxyReq = request
207
216
.withUri(uri(target))
@@ -217,7 +226,6 @@ object ReverseProxy extends App {
217
226
case None => Future .successful(NotFound (id, host))
218
227
}
219
228
}
220
-
221
229
val futReverseProxy = Http ().newServerAt(proxyHost, proxyPort).bind(handlerWithCircuitBreaker)
222
230
223
231
futReverseProxy.onComplete {
0 commit comments