Akka-http文档说:
除了将服务器端绑定的套接字视为 Source[IncomingConnection] 并将每个连接视为带有 Sink[HttpResponse] 的 Source[HttpRequest]
假设我们得到包含来自许多 Source[IncomingConnection] 的传入连接的合并源。
然后,假设我们从 Source[IncomingConnection] 获得 Source[HttpRequest](见下面的代码)。
然后,没问题,我们可以提供一个将 HttpRequest 转换为 HttpResponse 的流程。
这就是问题所在 - 我们如何正确接收响应?我们如何加入对连接的响应?
用例背后的整个想法是可以优先处理来自不同连接的传入请求。我猜在很多情况下应该很有用......
提前致谢!
编辑: 基于@RamonJRomeroyVigil 答案的解决方案:
服务器代码:
val in1 = Http().bind(interface = "localhost", port = 8200)
val in2 = Http().bind(interface = "localhost", port = 8201)
val connSrc = Source.fromGraph(FlowGraph.create() { implicit b =>
import FlowGraph.Implicits._
val merge = b.add(Merge[IncomingConnection](2))
in1 ~> print("in1") ~> merge.in(0)
in2 ~> print("in2") ~> merge.in(1)
SourceShape(merge.out)
})
val reqSrc : Source[(HttpRequest, IncomingConnection), _] =
connSrc.flatMapConcat { conn =>
Source.empty[HttpResponse]
.via(conn.flow)
.map(request => (request, conn))
}
val flow: Flow[(HttpRequest, IncomingConnection), (HttpResponse, IncomingConnection), _] =
Flow[(HttpRequest, IncomingConnection)].map{
case (HttpRequest(HttpMethods.GET, Uri.Path("/ping"), _, entity, _), conn: IncomingConnection) =>
println(s"${System.currentTimeMillis()}: " +
s"process request from ${conn.remoteAddress.getHostName}:${conn.remoteAddress.getPort}")
(HttpResponse(entity = "pong"), conn)
}
reqSrc.via(flow).to(Sink.foreach { case (resp, conn) =>
Source.single(resp).via(conn.flow).runWith(Sink.ignore)
}).run()
def print(prefix: String) = Flow[IncomingConnection].map { s =>
println(s"$prefix [ ${System.currentTimeMillis()} ]: ${s.remoteAddress}"); s
}
所以,我从控制台使用 curl 并看到以下内容:
% curl http://localhost:8200/ping
curl: (52) Empty reply from server
第二个 curl 请求失败:
% curl http://localhost:8200/ping
curl: (7) Failed to connect to localhost port 8200: Connection refused
在服务器控制台上,我在发送第一个请求时看到以下内容:
in1 [ 1450287301512 ]: /127.0.0.1:52461
1450287301626: process request from localhost:52461
[INFO] [12/16/2015 20:35:01.641] [default-akka.actor.default-dispatcher-6] [akka://default/system/IO-TCP-STREAM/server-1-localhost%2F127.0.0.1%3A8200] Message [akka.io.Tcp$Unbound$] from Actor[akka://default/system/IO-TCP/selectors/$a/0#119537130] to Actor[akka://default/system/IO-TCP-STREAM/server-1-localhost%2F127.0.0.1%3A8200#-1438663077] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [12/16/2015 20:35:01.641] [default-akka.actor.default-dispatcher-6] [akka://default/system/IO-TCP-STREAM/server-2-localhost%2F127.0.0.1%3A8201] Message [akka.io.Tcp$Unbound$] from Actor[akka://default/system/IO-TCP/selectors/$a/1#679898594] to Actor[akka://default/system/IO-TCP-STREAM/server-2-localhost%2F127.0.0.1%3A8201#1414174163] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
发送第二个请求时什么也没有。
因此,内部连接流(如@RamonJRomeroyVigil 所述)或其他问题似乎存在一些问题......
基本上代码不起作用。
仍在调查问题。