我正在从 Kafka 发送数据,如下所示:
final JavaPairDStream<String, Row> transformedMessages =
rtStream
.mapToPair(record -> new Tuple2<String, GenericDataModel>(record.key(), record.value()))
.mapWithState(StateSpec.function(updateDataFunc).numPartitions(32)).stateSnapshots()
.foreachRDD(rdd -> {
--logic goes here
});
我有四个工作线程和这个应用程序的多个执行程序,我正在尝试检查 Spark 的容错性。
由于我们使用的是 mapWithState,spark 正在检查点数据到 HDFS,所以如果任何 executor/worker 出现故障,我们应该能够恢复丢失的数据(数据丢失在死 executor 中),并继续使用剩余的 executor/workers。
所以我杀死了一个工作节点以查看应用程序是否仍然运行顺利,但是我在 HDFS 中得到了 FileNotFound 的异常,如下所示:
这有点奇怪,因为 Spark 有时在 HDFS 中检查点数据,为什么它无法找到它。显然 HDFS 没有删除任何数据,所以为什么会出现这个异常。
或者我在这里错过了什么?
[ERROR] 2018-08-21 13:07:24,067 org.apache.spark.streaming.scheduler.JobScheduler logError - Error running job streaming job 1534871220000 ms.2
org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: java.io.FileNotFoundException: File does not exist: hdfs://mycluster/user/user1/sparkCheckpointData/2db59817-d954-41a7-9b9d-4ec874bc86de/rdd-1005/part-00000
java.io.FileNotFoundException: File does not exist: hdfs://mycluster/user/user1/sparkCheckpointData/2db59817-d954-41a7-9b9d-4ec874bc86de/rdd-1005/part-00000
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1122)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
at org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89)
at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:273)
at scala.Option.map(Option.scala:146)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:273)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1615)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1626)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1625)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1625)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1625)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1623)
进一步更新:我发现 Spark 试图在 HDFS 中查找的 RDD 已被“ReliableRDDCheckpointData”进程删除,它为检查点数据创建了一个新的 RDD。DAG 以某种方式指向这个旧的 RDD。如果对此数据有任何引用,则不应将其删除。