在我们基于 akka-streams 的 Tcp 服务器中,close() 的预期行为是客户端应该立即看到 SocketException,并且在异常之后发布的任何消息都不应该传递到服务器。但是,必须传递异常之前发送的最后一条消息。所有新客户端都应该遇到 ConnectException。
无法破译akka-streams 1.0 文档中关于“关闭连接”的段落,我们正在为 Tcp 服务器使用如下实现 close():
val serverSource: Promise[ByteString] = Promise[ByteString]()
var serverBindingOption: Option[Tcp.ServerBinding] = None
def bind(address: String, port: Int, target: ActorRef, maxInFlight: Int)
(implicit system: ActorSystem, actorMaterializer: ActorMaterializer): Future[SyslogStreamServer] = {
implicit val executionContext = system.dispatcher
val sink = Sink.foreach {
conn: Tcp.IncomingConnection =>
val targetSubscriber = ActorSubscriber[Message](system.actorOf(Props(new TargetSubscriber(target, maxInFlight))))
val targetSink = Flow[ByteString]
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = MaxFrameLength, allowTruncation = true))
.map(raw ⇒ Message(raw))
.to(Sink(targetSubscriber))
conn.flow.to(targetSink).runWith(Source(serverSource.future).drop(1)) // drop the last one for no response
}
for {
serverBinding ← Tcp().bind(address, port).to(sink).run()
} yield {
serverBindingOption = Some(serverBinding)
new SyslogStreamServer
}
}
def close()(implicit ec: ExecutionContext): Future[Unit] = {
serverSource.success(ByteString.empty) // dummy value to be dropped above
serverBindingOption match {
case Some(serverBinding) ⇒ serverBinding.unbind()
case None ⇒ Future[Unit] {}
}
}
这是关闭服务器的好方法吗?(用于部署等)
如果有人可以审查和评论上述 close() 实现的有效性。我们试图证明或反驳自己此代码是否满足所有要求,并想知道我们是否应该首先成功流的源,或者先解除绑定 serverBinding ?非常感谢任何评论/批评。提前致谢 !