1

考虑这个简单的流:

Source(1 to 5)
.mapAsync(1) { i =>
  if (i % 3 == 0) Future.failed(new Exception("I don't like 3"))
  else Future.successful(i)
}
.withAttributes(
  ActorAttributes.supervisionStrategy(Supervision.restartingDecider)
)
.runForeach(i => println(s"#$i"))

这实际上打印

#1
#2
#4

这与恢复策略相​​同。我希望流在失败的未来之后重新启动,输出如下

#1
#2
#1
#2
...
  1. 为什么在这种情况下 Resume 和 Restart 策略的行为方式相同?
  2. 如何从头开始重新启动流?
4

1 回答 1

3

问题 1resume :和之间的区别在于restart- 后者 - 失败阶段重新启动,失去所有累积的内部状态。(请参阅文档以供参考)。

在您的情况下,您有一个mapAsync并行度为 1 的阶段,因此您实际上永远不会有任何累积状态。这导致在行为resumerestart是等效的。

问题 2: Akka 流中监督策略的语义与失败的特定阶段有关。一个失败的阶段根本无法重播过去流动的元素,因为它们已经消失了——即没有在任何地方举行。没有任何监督策略可以给你这个。

您正在寻找的是整个流的重新启动,这应该可以通过recoverWithRetries组合器(docs)实现。您可以再次将相同的源 ( Source(1 to 5)) 提供给组合器,让它重放这些元素。

于 2017-03-11T13:41:41.547 回答