我正在我的计算机上本地测试 Apache Spark 上的网络抓取/抓取程序。
该程序使用了一些 RDD 转换,这些转换采用了偶尔失败的 volatile 函数。(该函数的目的是将 URL 链接转换为网页,有时它调用的无头浏览器只是停电或过载 - 我无法避免这种情况)
我听说 Apache Spark 具有强大的故障转移和重试功能,任何不成功的转换或丢失的数据都可以从它可以找到的任何资源从头开始重新计算(听起来很神奇吧?)所以我没有在我的代码。
这是我的火花配置:
val conf = new SparkConf().setAppName("MoreLinkedIn")
conf.setMaster("local[*]")
conf.setSparkHome(System.getenv("SPARK_HOME"))
conf.setJars(SparkContext.jarOfClass(this.getClass).toList)
conf.set("spark.task.maxFailures","40") //definitely enough
不幸的是,在大多数阶段和个别任务成功后,这项工作失败了。最新登录控制台显示:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0:7 failed 1 times, most recent failure: Exception failure in TID 23 on host localhost: org.openqa.selenium.TimeoutException: Timed out after 50 seconds waiting for...
看起来Spark在失败一次后只是懦弱地放弃了。如何正确配置它以使其更坚韧?
(我的程序可以从https://github.com/tribbloid/spookystuff下载,对于稀缺和杂乱无章的代码/文档,我只是开始它几天)
ADD:如果你想自己尝试,下面的代码可以演示这个问题:
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Pi")
conf.setMaster("local[*]")
conf.setSparkHome(System.getenv("SPARK_HOME"))
conf.setJars(SparkContext.jarOfClass(this.getClass).toList)
conf.set("spark.task.maxFailures","400000")
val sc = new SparkContext(conf)
val slices = if (args.length > 0) args(0).toInt else 8
val n = 100000 * slices
val count = sc.parallelize(1 to n, slices).map { i =>
val x = java.lang.Math.random()
if (x > 0.9) throw new IllegalStateException("the map has a chance of 10% to fail")
x
}.reduce(_ + _)
sc.stop()
println("finished")
}
应该注意的是,相同的 IllegalStateException 在这篇文章中被重试了 32 次: Apache Spark Throws java.lang.IllegalStateException: unread block data