1

我正在尝试实现一个 scalaz-stream 通道,该通道累积有关它接收到的事件的统计信息,一旦完成,就会发出最终统计信息。

举一个具体的简化示例:假设您有一个Process[Task, String]其中每个字符串都是一个单词的地方。我想要一个Channel[Task, String, (String, Int)],当应用于初始过程时,会耗尽它,计算每个单词出现的次数,然后发出它。

我意识到这是通过折叠微不足道的:

input.foldMap(w => Map(w -> 1))
     .flatMap(m => Process.emitAll(m.toSeq))
     .maximumBy(_._2)

我正在尝试编写的是标准累加器的集合,然后我可以将我的进程通过管道传递——而不是显式折叠,比如说,我会写:

input.through(wordFrequency)
     .maximumBy(_._2)

不过,我有点不知所措-如果不共享状态,我无法弄清楚如何做到这一点。编写一个Sink累积到 a 的 aMap[String, Int]相当简单,但是一旦 sink 终止,就无法获得 map 的最终状态并发出它。

4

0 回答 0