0

我是 Akka Stream 的新手,我想了解如何为我的项目处理 TCP 套接字。我从Akka Stream 官方文档中获取了这段代码。

import akka.stream.scaladsl.Framing

val connections: Source[IncomingConnection, Future[ServerBinding]] =
  Tcp().bind(host, port)

connections.runForeach { connection =>
  println(s"New connection from: ${connection.remoteAddress}")

  val echo = Flow[ByteString]
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
    .map(_.utf8String)
    .map(_ + "!!!\n")
    .map(ByteString(_))

  connection.handleWith(echo)
}

如果我使用 netcat 从终端连接,我可以看到 Akka Stream TCP 套接字按预期工作。我还发现如果我需要使用用户消息关闭连接,我可以使用takeWhile如下

import akka.stream.scaladsl.Framing

val connections: Source[IncomingConnection, Future[ServerBinding]] =
  Tcp().bind(host, port)

connections.runForeach { connection =>
  println(s"New connection from: ${connection.remoteAddress}")

  val echo = Flow[ByteString]
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
    .map(_.utf8String)
    .takeWhile(_.toLowerCase.trim != "exit")   // < - - - - - - HERE
    .map(_ + "!!!\n")
    .map(ByteString(_))

  connection.handleWith(echo)
}

我找不到的是如何管理由CMD + C操作关闭的套接字。PeerCloseAkka Stream 内部使用 Akka.io 管理 TCP 连接,因此它必须在套接字关闭时发送一些消息。所以,我对 Akka.io 的理解告诉我,我应该收到来自套接字关闭的反馈,但我找不到如何使用 Akka Stream 来做到这一点。有没有办法管理它?

4

1 回答 1

1

connection.handleWith(echo)是语法糖,connection.flow.joinMat(echo)(Keep.right).run()其物化值为echo,通常没有用。 Flow.via.map.takeWhile具有NotUsed作为物化的价值,所以这也基本上没用。但是,您可以附加以echo不同方式实现的阶段。

其中之一是.watchTermination

connections.runForeach { connection =>
  println(s"New connection from: ${connection.remoteAddress}")

  val echo: Flow[ByteString, ByteString, Future[Done]] = Flow[ByteString]
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
    .map(_.utf8String)
    .takeWhile(_.toLowerCase.trim != "exit")   // < - - - - - - HERE
    .map(_ + "!!!\n")
    .map(ByteString(_))
    // change the materialized value to a Future[Done]
    .watchTermination()(Keep.right)

  // you may need to have an implicit ExecutionContext in scope, e.g. system.dispatcher,
  //  if you don't already
  connection.handleWith(echo).onComplete {
    case Success(_) => println("stream completed successfully")
    case Failure(e) => println(e.getMessage)
  }
}

这将无法区分您端或远程端是否正常关闭连接;它将区分流失败。

于 2021-02-13T05:14:59.203 回答