41

介绍

Scala Future2.10现在 2.9.3中的新功能)是一个应用函子,这意味着如果我们有一个可遍历的类型 F,我们可以将一个F[A]和一个函数A => Future[B]转换为一个Future[F[B]].

此操作在标准库中作为Future.traverse. Scalaz 7还提供了一个更通用的方法,如果我们从traverse中导入 applicative functor 实例,我们可以在这里使用它。Futurescalaz-contrib

这两种traverse方法在流的情况下表现不同。标准库遍历在返回之前消耗流,而 Scalaz立即返回未来

import scala.concurrent._
import ExecutionContext.Implicits.global

// Hangs.
val standardRes = Future.traverse(Stream.from(1))(future(_))

// Returns immediately.
val scalazRes = Stream.from(1).traverse(future(_))

正如Leif Warner在这里所观察到的,还有另一个不同之处。标准库traverse立即启动所有异步操作,而 Scalaz 启动第一个,等待它完成,启动第二个,等待它,依此类推。

流的不同行为

通过编写一个函数,该函数将为流中的第一个值休眠几秒钟,很容易显示第二个区别:

def howLong(i: Int) = if (i == 1) 10000 else 0

import scalaz._, Scalaz._
import scalaz.contrib.std._

def toFuture(i: Int)(implicit ec: ExecutionContext) = future {
  printf("Starting %d!\n", i)
  Thread.sleep(howLong(i))
  printf("Done %d!\n", i)
  i
}

现在Future.traverse(Stream(1, 2))(toFuture)将打印以下内容:

Starting 1!
Starting 2!
Done 2!
Done 1!

Scalaz 版本 ( Stream(1, 2).traverse(toFuture)):

Starting 1!
Done 1!
Starting 2!
Done 2!

这可能不是我们想要的。

对于列表?

奇怪的是,这两个遍历在列表上的这方面表现相同——Scalaz 不会在开始下一个未来之前等待一个未来完成。

另一个未来

Scalaz 还包括自己的concurrent包和自己的期货实现。我们可以使用与上述相同的设置:

import scalaz.concurrent.{ Future => FutureZ, _ }

def toFutureZ(i: Int) = FutureZ {
  printf("Starting %d!\n", i)
  Thread.sleep(howLong(i))
  printf("Done %d!\n", i)
  i
}

然后我们得到 Scalaz 在列表和流的流上的行为:

Starting 1!
Done 1!
Starting 2!
Done 2!

也许不那么令人惊讶的是,遍历无限流仍然会立即返回。

问题

此时我们确实需要一个表格来总结,但必须要有一个列表:

  • 带标准库遍历的流:在返回前消费;不要等待每个未来。
  • 带有Scalaz遍历的流:立即返回;等待每个未来完成。
  • Scalaz 带有流的期货:立即返回;等待每个未来完成。

和:

  • 具有标准库遍历的列表:不要等待。
  • 带有 Scalaz 遍历的列表:不要等待。
  • 带有列表的 Scalaz 期货:请等待每个未来完成。

这有道理吗?列表和流上的此操作是否存在“正确”行为?是否有某种原因,“最异步”的行为——即在返回之前不要使用集合,并且不要等待每个未来完成后再继续下一个——这里没有表示?

4

2 回答 2

1

我无法全部回答,但我尝试了一些部分:

是否有某种原因,“最异步”的行为——即在返回之前不要使用集合,并且不要等待每个未来完成后再继续下一个——这里没有表示?

如果您有依赖计算和有限数量的线程,您可能会遇到死锁。例如,您有两个期货取决于第三个(期货列表中的所有三个)并且只有两个线程,您可能会遇到前两个期货阻塞所有两个线程而第三个永远不会执行的情况。(当然,如果你的池大小是1,即zou一个接一个地执行计算,你可以得到类似的情况)

为了解决这个问题,你需要每个未来一个线程,没有任何限制。这适用于小型期货清单,但不适用于大型期货。所以如果你并行运行,你会遇到这样一种情况,小例子在所有情况下都会运行,而更大的例子会死锁。(示例:开发人员测试运行良好,生产死锁)。

列表和流上的此操作是否存在“正确”行为?

我认为期货是不可能的。如果您了解更多的依赖关系,或者当您确定计算不会阻塞时,可能会有更多并发的解决方案。但是执行期货清单看起来我“被设计破坏了”。最好的解决方案似乎是一个,对于死锁的小例子(即一个接一个地执行一个 Future),它已经失败了。

带有列表的 Scalaz 期货:请等待每个未来完成。

我认为 scalaz 在内部使用理解来进行遍历。对于理解,不能保证计算是独立的。所以我猜 Scalaz 在这里对推导做正确的事情:一个接一个地进行计算。在期货的情况下,这将始终有效,因为您的操作系统中有无限的线程。

所以换句话说:你看到的只是一个关于理解(必须)如何工作的工件。

我希望这有点道理。

于 2013-09-20T08:07:18.190 回答
1

如果我正确理解了这个问题,我认为这真的归结为流与列表的语义。

遍历列表可以满足我们对文档的期望:

使用提供的函数将 a 转换TraversableOnce[A]为 a 。这对于执行并行映射很有用。例如,要将函数并行应用于列表的所有项目:Future[TraversableOnce[B]]A => Future[B]

对于流,由开发人员决定他们希望它如何工作,因为它依赖于比编译器更多的流知识(流可以是无限的,但类型系统不知道它)。如果我的流正在从文件中读取行,我想首先使用它,因为逐行链接期货实际上不会并行化事情。在这种情况下,我想要并行方法。

另一方面,如果我的流是一个无限列表,生成顺序整数并寻找大于某个大数的第一个素数,则不可能在一次扫描中首先消耗流(Future需要链式方法,我们d 可能想从流中运行批次)。

与其试图找出一种规范的方法来处理这个问题,我想知道是否缺少有助于使不同情况更明确的类型。

于 2016-07-13T08:28:07.477 回答