我正在尝试使用 akka-streams 和 akka-http 来解决以下问题:
- 我们有 2 个 http 客户端(A 和 B)向 1 个 http 服务器(C)发送请求。双方使用akka-http进行通信。
- 来自 A 的请求与 B 相比具有更高的优先级,但两个请求应该是平等的进程。所以 C 应该首先处理来自 A 的请求,来自 B 的请求是第二优先级的
- 当然,我们希望在每一端都启用背压
我想出了以下代码,以便将传入连接合并到一个输出:
val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit b: FlowGraph.Builder[Unit] =>
import FlowGraph.Implicits._
val merge = b.add(MergePreferred[IncomingConnection](1))
val inA: Source[IncomingConnection, Future[ServerBinding]] = Http().bind(interface = "localhost", port = 8200)
val inB: Source[IncomingConnection, Future[ServerBinding]] = Http().bind(interface = "localhost", port = 8201)
inA ~> merge.preferred
inB ~> merge.in(0)
merge.out ~> Sink.foreach(println)
ClosedShape
}).run()
所以,我有一个来自 A 和 B 的 IncomingConnection inctances 的源。
现在我想以某种方式处理它们,产生响应并将响应发送到相应的连接。
也许有更好的方法来存档所有这些东西,但我在文档或其他人的问题中找不到任何解决此类问题的示例。
另外我猜这个问题很常见。
在此先感谢您的帮助。