3

我是 Scala 和 Akka 的新手。我有一个简单的 RunnableFlow:

Source -> Flow (do some transformation) -> Sink.runForeach

现在我想要这样的东西:

Source -> Flow1 (do some transformation) -> Flow2 (do some transformation) -> Sink.runForeach

但是 Flow2 应该等到 Flow1 中的 100 个元素可用,然后将这 100 个元素转换为一个新元素(需要 Flow1 中的所有 100 个元素)并将这个新元素提供给 Sink。

我做了一些研究,发现了明确的用户定义缓冲区,但我不明白如何从 flow2 中的 flow1 访问所有 100 个元素并对其进行一些转换。有人可以解释一下吗?或者更好地发布一个简单的小例子?或两者?

4

1 回答 1

9

Akka 定义集合

如果您不介意使用 akka 确定的集合类型,那么您可以使用该grouped函数:

//alternative stream formation
val stream = Source(1 to 100).via(Flow[Int].grouped(bufferSize))
                             .runWith(Sink foreach println)

用户定义的集合

如果您想控制用于缓冲区的集合类型,例如 aSeqArray

type MyCollectionType[X] = Array[X]

def emptyMyCollection[X] : MyCollectionType[X] = Array.empty[X]

然后您可以使用两个 Flows 执行此操作。第一个 Flow 执行 ascan来构建一系列元素:

val bufferSize = 10

def appendToMyCollection[X](coll : MyCollectionType[X], i : X) : MyCollectionType[X] = 
  (if(coll.size < bufferSize) coll else emptyMyCollection[Int]) :+ i

val buffer : Flow[Int, MyCollectionType[Int], _] = 
  Flow[Int].scan[MyCollectionType[Int]](emptyMyCollection[Int]){
    (coll, i) => appendToMyCollection(coll, i)
  }

第二个 Flow 是一个filter具有正确大小的序列(即“goldiLocks”):

val goldiLocks : Flow[MyCollectionType[Int], MyCollectionType[Int],_] =
  Flow[MyCollectionType[Int]].filter(_.size == bufferSize)

这两个 Flow 可以组合起来生成一个 Stream,它将生成所需的集合类型:

val stream = Source(1 to 100).via(buffer)
                             .via(goldiLocks)
                             .runWith(Sink foreach println)
于 2015-10-22T12:31:00.050 回答