我有一个有序的数据流
A A A B B C C C C ... (very long)
我想将它转换为形式的聚合流(项目,计数):
(A, 3) (B, 2) (C, 4)
我可以在 Akka Streams 中使用哪些运算符?
Source.fromPublisher(publisher)
.aggregateSomehow() // ?
.runWith(sink)
我已经查看了.groupBy,但它要求我事先知道我不知道的类别数量。此外,我相信它会记住我想避免的所有组。我应该能够在处理后丢弃 (A, 3) 并释放它消耗的资源。
编辑:这个问题要求类似的功能,但使用子流。但是,似乎不需要使用 SubFlows,因为我有一个使用statefulMapConcat
组合器的解决方案。