提示:此答案基于akka-stream-experimental
version 2.0-M2
。API 在其他版本中可能略有不同。
关闭连接的一种简单方法是使用PushStage
:
import akka.stream.stage._
val closeClient = new PushStage[String, String] {
override def onPush(elem: String, ctx: Context[String]) = elem match {
case "goodbye" ⇒
// println("Connection closed")
ctx.finish()
case msg ⇒
ctx.push(msg)
}
}
在客户端或服务器端接收到的每个元素(以及通常通过 a 的每个元素Flow
)都会通过这样的Stage
组件。在 Akka 中,全抽象被称为GraphStage
,更多信息可以在官方文档中找到。
使用 aPushStage
我们可以观察具体传入元素的值,然后相应地转换上下文。在上面的示例中,一旦goodbye
收到消息,我们就会完成上下文,否则我们只需通过push
方法转发值。
现在,我们可以closeClient
通过该方法将组件连接到任意流transform
:
val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.transform(() ⇒ closeClient)
.map(_ ⇒ StdIn.readLine("> "))
.map(_ + "\n")
.map(ByteString(_))
connection.join(flow).run()
上面的流程接收 aByteString
并返回 a ByteString
,这意味着它可以connection
通过该join
方法连接。在流程内部,我们首先将字节转换为字符串,然后再将它们发送到closeClient
. 如果PushStage
没有完成流,则元素在流中被转发,在那里它被丢弃并被来自标准输入的一些输入替换,然后通过线路发送回。如果流完成了,舞台组件之后的所有进一步的流处理步骤都将被删除 - 流现在已关闭。