当我尝试从检查点目录重新加载我的 spark 流应用程序时,出现以下异常:
java.lang.IllegalArgumentException: requirement failed: Checkpoint directory does not exist: maprfs:/mapr/cellos-mapr/user/mbazarganigilani/checkpoints/22237996-da79-4f13-b142-3ab112b7c374/rdd-1009
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.rdd.ReliableCheckpointRDD.<init>(ReliableCheckpointRDD.scala:46)
at org.apache.spark.SparkContext$$anonfun$checkpointFile$1.apply(SparkContext.scala:1226)
at org.apache.spark.SparkContext$$anonfun$checkpointFile$1.apply(SparkContext.scala:1226)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)
at org.apache.spark.SparkContext.checkpointFile(SparkContext.scala:1225)
at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$restore$1.apply(DStreamCheckpointData.scala:112)
at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$restore$1.apply(DStreamCheckpointData.scala:109)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at org.apache.spark.streaming.dstream.DStreamCheckpointData.restore(DStreamCheckpointData.scala:109)
at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:515)
at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:516)
at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:516)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:516)
at org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:151)
at org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:151)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.streaming.DStreamGraph.restoreCheckpointData(DStreamGraph.scala:151)
at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:158)
at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:864)
at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:864)
at scala.Option.map(Option.scala:145)
at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:864)
at UnionStream$.main(UnionStreaming.scala:636)
at UnionStream.main(UnionStreaming.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
我想知道是否有任何方法可以覆盖此异常,例如从以前的检查点数据重新加载?