2

我有一个有序的数据流

A A A B B C C C C ... (very long)

我想将它转换为形式的聚合流(项目,计数):

(A, 3) (B, 2) (C, 4)

我可以在 Akka Streams 中使用哪些运算符?

Source.fromPublisher(publisher)
    .aggregateSomehow()  // ?
    .runWith(sink)

我已经查看了.groupBy,但它要求我事先知道我不知道的类别数量。此外,我相信它会记住我想避免的所有组。我应该能够在处理后丢弃 (A, 3) 并释放它消耗的资源。

编辑这个问题要求类似的功能,但使用子流。但是,似乎不需要使用 SubFlows,因为我有一个使用statefulMapConcat组合器的解决方案。

4

1 回答 1

1

一种选择是使用statefulMapConcat组合器:

Source(List("A", "A", "B", "B", "B", "C", "C", ""))
      .statefulMapConcat({ () =>
        var lastChar = ""
        var count = 0

        char => if(lastChar == char) {
            count += 1
            List.empty
          } else {
            val charCount = (lastChar, count)
            lastChar = char
            count = 1
            List(charCount)
          }
      })
    .runForeach(println)

但是,这需要在输入流中附加一个元素来标记结束。

输出:

(,0)
(A,2)
(B,3)
(C,2)

感谢@chunjef 在评论中的建议

于 2017-09-10T15:11:15.337 回答