0

我正在尝试使用 akka-streams 和 akka-http 来解决以下问题:

  • 我们有 2 个 http 客户端(A 和 B)向 1 个 http 服务器(C)发送请求。双方使用akka-http进行通信。
  • 来自 A 的请求与 B 相比具有更高的优先级,但两个请求应该是平等的进程。所以 C 应该首先处理来自 A 的请求,来自 B 的请求是第二优先级的
  • 当然,我们希望在每一端都启用背压

我想出了以下代码,以便将传入连接合并到一个输出:

val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit b: FlowGraph.Builder[Unit] =>
    import FlowGraph.Implicits._

    val merge = b.add(MergePreferred[IncomingConnection](1))

    val inA: Source[IncomingConnection, Future[ServerBinding]] = Http().bind(interface = "localhost", port = 8200)
    val inB: Source[IncomingConnection, Future[ServerBinding]] = Http().bind(interface = "localhost", port = 8201)

    inA ~> merge.preferred
    inB ~> merge.in(0)
           merge.out ~> Sink.foreach(println)

    ClosedShape
}).run()

所以,我有一个来自 A 和 B 的 IncomingConnection inctances 的源。

现在我想以某种方式处理它们,产生响应并将响应发送到相应的连接。

也许有更好的方法来存档所有这些东西,但我在文档或其他人的问题中找不到任何解决此类问题的示例。

另外我猜这个问题很常见。

在此先感谢您的帮助。

4

1 回答 1

1

IncomingConnection对象用于处理请求和返回响应的方式是使用方法handleXXX。基于文档api的示例:

同步功能

//this processor contains your logic for converting a request into a response
def requestProcessor(httpRequest : HttpRequest) : HttpResponse = ???

val connectionSink = Sink.foreach[IncomingConnection] { conn =>
  conn.handleWithSyncHandler(requestProcessor)
}

然后connectionSink可以在您的图形构造中使用它:

inA ~> merge.preferred
inB ~> merge.in(0)
       merge.out ~> connectionSink

异步函数

同样,如果您的处理器是异步的:

val asyncReqProcessor(httpRequest : HttpRequest) : Future[HttpResponse] = ???

val connectionSink = 
  Sink.foreach[IncomingConnection](_ handleWithAsyncHandler asyncProcessor)

流动

最后,您还可以使用 Flow(我的首选方法):

val flowReqProcessor : Flow[HttpRequest, HttpResponse,_] = ???

val connectionSink = 
  Sink.foreach[IncomingConnection](_ handleWith flowReqProcessor )

关于 Flows,我最喜欢的功能之一是您可以在不同的 Streams 中反复使用它们。因此, flowReqProcessor 可以是 aval而不是 a def

于 2015-12-16T12:47:33.043 回答