我一直在玩一些实验性的 Akka Streams API,我有一个用例,我想看看如何实现。对于我的用例,我有一个StreamTcp
基础Flow
,它通过将连接的输入流绑定到我的服务器套接字来提供。我拥有的流程基于ByteString
进入其中的数据。传入的数据将有一个分隔符,这意味着我应该将分隔符之前的所有内容视为一条消息,并将下一个分隔符之后的所有内容视为下一条消息。所以玩一个更简单的例子,不使用套接字,只使用静态文本,这就是我想出的:
import akka.actor.ActorSystem
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.scaladsl.Flow
import scala.util.{ Failure, Success }
import akka.util.ByteString
object BasicTransformation {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("Sys")
val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.")
Flow(data).
splitWhen(c => c == '.').
foreach{producer =>
Flow(producer).
filter(c => c != '.').
fold(new StringBuilder)((sb, c) => sb.append(c.toChar)).
map(_.toString).
filter(!_.isEmpty).
foreach(println(_)).
consume(FlowMaterializer(MaterializerSettings()))
}.
onComplete(FlowMaterializer(MaterializerSettings())) {
case any =>
system.shutdown
}
}
}
Flow
我发现实现我的目标的主要功能是splitWhen
,然后生成额外的子流,每个.
分隔符对应每个消息。然后,我使用另一个步骤管道处理每个子流,最后在最后打印各个消息。
这一切似乎有点冗长,以完成我认为非常简单和常见的用例。所以我的问题是,是否有一种更简洁、更简洁的方式来执行此操作,或者这是通过分隔符拆分流的正确和首选方式?