Skip to content

Commit 5ccc372

Browse files
committed
Add HTTP Header X-Content-Hash as an example of an on-the-fly processing scenario
1 parent f749e1f commit 5ccc372

File tree

1 file changed

+29
-5
lines changed

1 file changed

+29
-5
lines changed

src/main/scala/akkahttp/ReverseProxy.scala

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package akkahttp
33
import akkahttp.ReverseProxy.Mode.Mode
44
import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
55
import io.circe.*
6+
import org.apache.pekko.NotUsed
67
import org.apache.pekko.actor.ActorSystem
78
import org.apache.pekko.http.scaladsl.model.*
89
import org.apache.pekko.http.scaladsl.model.Uri.Authority
@@ -13,9 +14,11 @@ import org.apache.pekko.http.scaladsl.settings.ServerSettings
1314
import org.apache.pekko.http.scaladsl.{Http, HttpExt}
1415
import org.apache.pekko.pattern.{CircuitBreaker, CircuitBreakerOpenException}
1516
import org.apache.pekko.stream.ThrottleMode
16-
import org.apache.pekko.stream.scaladsl.{Sink, Source}
17+
import org.apache.pekko.stream.scaladsl.{Flow, Sink, Source}
18+
import org.apache.pekko.util.ByteString
1719
import org.slf4j.{Logger, LoggerFactory}
1820

21+
import java.security.MessageDigest
1922
import java.util.concurrent.ConcurrentHashMap
2023
import java.util.concurrent.atomic.AtomicInteger
2124
import scala.collection.parallel.CollectionConverters.ImmutableIterableIsParallelizable
@@ -28,10 +31,11 @@ import scala.util.{Failure, Success}
2831
* https://github.com/mathieuancelin/akka-http-reverse-proxy
2932
*
3033
* Features ReverseProxy:
31-
* - Weighted round robin load balancing
34+
* - Weighted round-robin load balancing
3235
* - Retry on HTTP 5xx from target servers
3336
* - CircuitBreaker per target server to avoid overload
3437
* - HTTP Header `X-Correlation-ID` for tracing (only for Mode.local)
38+
* - HTTP Header `X-Content-Hash` as an example of an on-the-fly processing scenario
3539
*
3640
* Mode.local:
3741
* HTTP client(s) --> ReverseProxy --> local target server(s)
@@ -42,7 +46,7 @@ import scala.util.{Failure, Success}
4246
* Remarks:
4347
* - The target server selection is via the "Host" HTTP header
4448
* - Local/Remote target servers are designed to be flaky to show Retry/CircuitBreaker behavior
45-
* - On top of the built in client, you may also try other clients
49+
* - On top of the built-in client, you may also try other clients
4650
* - This PoC may not scale well, possible bottlenecks are:
4751
* - Combination of Retry/CircuitBreaker
4852
* - Round robin impl. with `requestCounter` means shared state
@@ -164,6 +168,12 @@ object ReverseProxy extends App {
164168
uri
165169
}
166170

171+
def computeHashWithPayloadAndPayloadLength: Flow[ByteString, (MessageDigest, ByteString, Int), NotUsed] =
172+
Flow[ByteString].fold((MessageDigest.getInstance("SHA-256"), ByteString.empty, 0)) { (acc, chunk) =>
173+
acc._1.update(chunk.toByteBuffer)
174+
(acc._1, acc._2 ++ chunk, acc._3 + chunk.length)
175+
}
176+
167177
services.get(mode) match {
168178
case Some(rawSeq) =>
169179
val seq = rawSeq.flatMap(t => (1 to t.weight).map(_ => t))
@@ -179,8 +189,22 @@ object ReverseProxy extends App {
179189
// If not, clients get 503 from pekko-http
180190
callTimeout = 10.seconds,
181191
resetTimeout = 10.seconds))
182-
val proxyReq = request.withUri(uri(target)).withHeaders(headers(target))
183-
circuitBreaker.withCircuitBreaker(http.singleRequest(proxyReq))
192+
193+
// Example of an on-the-fly processing scenario
194+
val hashFuture = request.entity.dataBytes
195+
.via(computeHashWithPayloadAndPayloadLength)
196+
.runWith(Sink.head)
197+
.map { case (digest, _, _) =>
198+
RawHeader("X-Content-Hash", digest.digest().map("%02x".format(_)).mkString)
199+
}
200+
201+
hashFuture.flatMap { hashHeader =>
202+
val proxyReq = request
203+
.withUri(uri(target))
204+
.withHeaders(headers(target) :+ hashHeader)
205+
circuitBreaker.withCircuitBreaker(http.singleRequest(proxyReq))
206+
}
207+
184208
}.recover {
185209
case _: CircuitBreakerOpenException => BadGateway(id, "Circuit breaker opened")
186210
case _: TimeoutException => GatewayTimeout(id)

0 commit comments

Comments
 (0)