4

假设我需要对一个大文本文件中的每一行应用两个函数f: String => A并最终创建一个.g: A => BB

由于文件很大而且f价格g昂贵,我想让处理并发。我可以使用“并行集合”并执行类似io.Source.fromFile("data.txt").getLines.toList.par.map(l => g(f(l))的操作,但它不会同时执行读取文件f、和g

在这个例子中实现并发的最佳方式是什么?

4

2 回答 2

14

首先,重要说明:不要使用.paron,List因为它需要复制所有数据(因为List只能顺序读取)。相反,使用类似的东西Vector.par转换可以在没有复制的情况下发生。

似乎您以错误的方式思考并行性。以下是会发生的情况:

如果你有这样的文件:

0
1
2
3
4
5
6
7
8
9

和功能fg

def f(line: String) = {
  println("running f(%s)".format(line))
  line.toInt
}

def g(n: Int) = {
  println("running g(%d)".format(n))
  n + 1
}

然后你可以这样做:

io.Source.fromFile("data.txt").getLines.toIndexedSeq[String].par.map(l => g(f(l)))

并获得输出:

running f(3)
running f(0)
running f(5)
running f(2)
running f(6)
running f(1)
running g(2)
running f(4)
running f(7)
running g(4)
running g(1)
running g(6)
running g(3)
running g(5)
running g(0)
running g(7)
running f(9)
running f(8)
running g(9)
running g(8)

因此,即使整个g(f(l))操作都发生在同一个线程上,您也可以看到每一行都可以并行处理。因此,许多fandg操作可以在不同的线程上同时发生,但特定行的and将按f顺序发生。g

毕竟,这是您应该期望的方式,因为实际上它无法读取行、运行fg并行运行。例如,如果该行尚未被读取,它如何g在输出上执行?f

于 2012-12-12T15:52:41.013 回答
3

您可以map使用Future

val futures = io.Source.fromFile(fileName).getLines.map{ s => Future{ stringToA(s) }.map{ aToB } }.toIndexedSeq

val results = futures.map{ Await.result(_, 10 seconds) }
// alternatively:
val results = Await.result(Future.sequence(futures), 10 seconds)
于 2012-12-12T15:51:16.797 回答