4

我正在我的计算机上本地测试 Apache Spark 上的网络抓取/抓取程序。

该程序使用了一些 RDD 转换,这些转换采用了偶尔失败的 volatile 函数。(该函数的目的是将 URL 链接转换为网页,有时它调用的无头浏览器只是停电或过载 - 我无法避免这种情况)

我听说 Apache Spark 具有强大的故障转移和重试功能,任何不成功的转换或丢失的数据都可以从它可以找到的任何资源从头开始重新计算(听起来很神奇吧?)所以我没有在我的代码。

这是我的火花配置:

val conf = new SparkConf().setAppName("MoreLinkedIn")
conf.setMaster("local[*]")
conf.setSparkHome(System.getenv("SPARK_HOME"))
conf.setJars(SparkContext.jarOfClass(this.getClass).toList)
conf.set("spark.task.maxFailures","40") //definitely enough

不幸的是,在大多数阶段和个别任务成功后,这项工作失败了。最新登录控制台显示:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0:7 failed 1 times, most recent failure: Exception failure in TID 23 on host localhost: org.openqa.selenium.TimeoutException: Timed out after 50 seconds waiting for...

看起来Spark在失败一次后只是懦弱地放弃了。如何正确配置它以使其更坚韧?

(我的程序可以从https://github.com/tribbloid/spookystuff下载,对于稀缺和杂乱无章的代码/文档,我只是开始它几天)

ADD:如果你想自己尝试,下面的代码可以演示这个问题:

def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Pi")
conf.setMaster("local[*]")
conf.setSparkHome(System.getenv("SPARK_HOME"))
conf.setJars(SparkContext.jarOfClass(this.getClass).toList)
conf.set("spark.task.maxFailures","400000")
val sc = new SparkContext(conf)
val slices = if (args.length > 0) args(0).toInt else 8
val n = 100000 * slices
val count = sc.parallelize(1 to n, slices).map { i =>
  val x = java.lang.Math.random()
  if (x > 0.9) throw new IllegalStateException("the map has a chance of 10% to fail")
  x
}.reduce(_ + _)
sc.stop()
println("finished")
}

应该注意的是,相同的 IllegalStateException 在这篇文章中被重试了 32 次: Apache Spark Throws java.lang.IllegalStateException: unread block data

4

3 回答 3

9

我知道这是一个非常古老的问题,但我遇到了完全相同的问题,并且在寻找解决方案时遇到了这个问题。

有 3 种主 URL 格式可以在本地模式下提交 Spark 应用程序:

  • local- 一个线程(无并行性),无重试
  • local[K](或local[*]) - 使用K(或核心数)工作线程并设置task.maxFailures1参见此处

  • local[K, F](或local[*, F])- 设置task.maxFailures=F, 这就是我们所追求的。

有关详细信息,请参阅Spark 文档

于 2017-12-08T16:21:42.027 回答
7

转发一下最权威的答案:

如果这对本地模式有用,我们应该打开一个 JIRA 来记录设置或改进它(我更喜欢添加 spark.local.retries 属性而不是特殊的 URL 格式)。我们最初对除单元测试之外的所有内容都禁用了它,因为 90% 的本地模式下的异常意味着应用程序中的问题,我们宁愿让用户立即调试,而不是多次重试任务并让他们担心关于为什么他们得到这么多错误。

马太

于 2014-06-10T05:46:48.313 回答
0

这对我有用 -

sparkConfig
.set("spark.task.maxFailures", "2")
.set("spark.master", "local[2, 2]")

我必须同时设置两者才能看到我的任务失败(同时引发异常),然后在本地测试环境中重新尝试。

于 2019-01-28T09:52:53.407 回答