5

在我们基于 akka-streams 的 Tcp 服务器中,close() 的预期行为是客户端应该立即看到 SocketException,并且在异常之后发布的任何消息都不应该传递到服务器。但是,必须传递异常之前发送的最后一条消息。所有新客户端都应该遇到 ConnectException。

无法破译akka-streams 1.0 文档中关于“关闭连接”的段落,我们正在为 Tcp 服务器使用如下实现 close():

  val serverSource: Promise[ByteString] = Promise[ByteString]()

  var serverBindingOption: Option[Tcp.ServerBinding] = None

  def bind(address: String, port: Int, target: ActorRef, maxInFlight: Int)
    (implicit system: ActorSystem, actorMaterializer: ActorMaterializer): Future[SyslogStreamServer] = {
    implicit val executionContext = system.dispatcher
    val sink = Sink.foreach {
      conn: Tcp.IncomingConnection =>
        val targetSubscriber = ActorSubscriber[Message](system.actorOf(Props(new TargetSubscriber(target, maxInFlight))))

        val targetSink = Flow[ByteString]
          .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = MaxFrameLength, allowTruncation = true))
          .map(raw ⇒ Message(raw))
          .to(Sink(targetSubscriber))

        conn.flow.to(targetSink).runWith(Source(serverSource.future).drop(1)) // drop the last one for no response
    }

    for {
      serverBinding ← Tcp().bind(address, port).to(sink).run()
    } yield {
      serverBindingOption = Some(serverBinding)
      new SyslogStreamServer
    }
  }

  def close()(implicit ec: ExecutionContext): Future[Unit] = {
    serverSource.success(ByteString.empty) // dummy value to be dropped above
    serverBindingOption match {
      case Some(serverBinding) ⇒ serverBinding.unbind()
      case None ⇒ Future[Unit] {}
    }
  }

这是关闭服务器的好方法吗?(用于部署等)

如果有人可以审查和评论上述 close() 实现的有效性。我们试图证明或反驳自己此代码是否满足所有要求,并想知道我们是否应该首先成功流的源,或者先解除绑定 serverBinding ?非常感谢任何评论/批评。提前致谢 !

4

0 回答 0