2

当我尝试从检查点目录重新加载我的 spark 流应用程序时,出现以下异常:

java.lang.IllegalArgumentException: requirement failed: Checkpoint directory does not exist: maprfs:/mapr/cellos-mapr/user/mbazarganigilani/checkpoints/22237996-da79-4f13-b142-3ab112b7c374/rdd-1009
    at scala.Predef$.require(Predef.scala:233)
    at org.apache.spark.rdd.ReliableCheckpointRDD.<init>(ReliableCheckpointRDD.scala:46)
    at org.apache.spark.SparkContext$$anonfun$checkpointFile$1.apply(SparkContext.scala:1226)
    at org.apache.spark.SparkContext$$anonfun$checkpointFile$1.apply(SparkContext.scala:1226)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)
    at org.apache.spark.SparkContext.checkpointFile(SparkContext.scala:1225)
    at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$restore$1.apply(DStreamCheckpointData.scala:112)
    at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$restore$1.apply(DStreamCheckpointData.scala:109)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
    at org.apache.spark.streaming.dstream.DStreamCheckpointData.restore(DStreamCheckpointData.scala:109)
    at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:515)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:516)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:516)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:516)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:151)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:151)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.streaming.DStreamGraph.restoreCheckpointData(DStreamGraph.scala:151)
    at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:158)
    at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:864)
    at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:864)
    at scala.Option.map(Option.scala:145)
    at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:864)
    at UnionStream$.main(UnionStreaming.scala:636)
    at UnionStream.main(UnionStreaming.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)

我想知道是否有任何方法可以覆盖此异常,例如从以前的检查点数据重新加载?

4

0 回答 0