1

我正在尝试使用scalaz-stream同时处理多个文件,将单个函数应用于文件中的每一行,跨越所有文件。具体来说,假设我有一个接受字符串列表的函数

def f(lines: Seq[String]): Something = ???

还有几个文件,第一个:

foo1
foo2
foo3

第二:

bar1
bar2
bar3

整个过程的结果应该是:

List(
  f(Seq("foo1", "bar1")), 
  f(Seq("foo2", "bar2")), 
  f(Seq("foo3", "bar3"))
)

(或者更有可能直接写入其他文件)

事先不知道文件的数量,并且不同文件之间的行数可能会有所不同,但是我可以使用默认值填充(在运行时)较短文件的末端,或者删除较长的文件。

所以本质上,我需要一种方法将 a Seq[Process[Task, String]](通过类似的东西获得io.linesR)组合成一个Process[Task, Seq[String]].

实现这一目标的最简单方法是什么?

或者,更一般地说,我如何将n实例组合Process[F, I]成一个实例Process[F, Seq[I]]

我确信为此目的有一些标准组合器,但我无法弄清楚......

谢谢。

4

1 回答 1

4

该组合器尚不存在,但您可以添加它。我认为这将是这样的:

def zipN[F[_], A](xs: Seq[Process[F,A]]): Process[F,Seq[A]] = 
  if (xs.isEmpty) Process.halt
  else xs.map(_ map (Vector(_))).reduceLeft(_.zipWith(_)(_ ++ _))

您还可以添加zipAllN, 它采用一个值来填充序列(并且使用zipAll, 和alignN,它允许流在耗尽时“退出”输出过程。(因此输出序列可能会变短。)

我建议您将其实现为“平衡”减少而不是左或右减少,因为这样会更有效。

如果您最终真正实现了这一点,请务必提交拉取请求 + 测试!

于 2013-11-22T03:29:57.660 回答