3

我正在尝试提出一种解决方案,将我收到的传入字符串拆分为多个字符串。我一直在研究,看起来在以前版本的 Akka-Streams 中有一个类Transformer可以扩展来进行这种转换。

在我使用的版本(RC2)中有Stages 但我不确定如何实现拆分模式。

Source.actorPublisher[String](MyActor.props).
.XXXXX(_.split("\n"))
.map(...)
.to(Sink(...))

我正在寻找XXXXX允许我输入 aString并返回一个序列String并将每个序列发送到流的其余部分的组件。

4

2 回答 2

5

我同意@jrudolph 的观点,这mapConcat可能就是您正在寻找的。一个简单的例子展示了这个方法的实际效果:

  val strings = List(
  """hello
     world
     test
     this""",
     """foo
     bar
     baz
     """

  )

  implicit val system = ActorSystem("test")
  implicit val mater = ActorFlowMaterializer()
  Source(strings).
    mapConcat(_.split("\n").map(_.trim).toList).
    runForeach(println)

如果您运行此代码,您将看到打印出以下内容:

hello
world
test
this
foo     
bar
baz
于 2015-05-20T11:20:39.263 回答
1

AkkaFraming为此类问题提供了辅助函数。

假设您的字符集是 UTF-8,您可以编写一个函数,该函数接受分隔String值的最大大小并返回一个Flow可以执行拆分的函数:

import akka.stream.scaladsl.Framing
import akka.util.ByteString

val newLineSplitter : (Int) => Flow[String, String, NotUsed] = 
  (maxLineSize) =>
    Flow[String]
      .map(ByteString.apply)
      .via(Framing delimiter (ByteString("\n"), maxLineSize))
      .via(Flow[ByteString] map (_.utf8String))
于 2017-08-08T14:46:25.203 回答