我正面临火花流作业的问题,我试图在火花中同时使用广播、mapWithState和检查点。
以下是用法:
- 由于我必须将一些连接对象(不可序列化)传递给执行程序,因此我使用的是org.apache.spark.broadcast.Broadcast
- 由于我们必须维护一些缓存信息,因此我使用带有 mapWithState 的状态流
- 我也在使用我的流上下文的检查点
我还需要将广播的连接对象传递到 mapWithState 中,以便从外部源获取一些数据。
当新创建上下文时,流程工作得很好。但是,当我使应用程序崩溃并尝试从检查点恢复时,我得到了 ClassCastException。
我根据asyncified.io 中的示例放置了一个小代码片段,以在github中重现该问题:
- 我的广播逻辑是yuvalitzchakov.utils.KafkaWriter.scala
- 应用程序的虚拟逻辑是yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast.scala
代码的虚拟片段:
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark-stateful-example")
...
val prop = new Properties()
...
val config: Config = ConfigFactory.parseString(prop.toString)
val sc = new SparkContext(sparkConf)
val ssc = StreamingContext.getOrCreate(checkpointDir, () => {
println("creating context newly")
clearCheckpoint(checkpointDir)
val streamingContext = new StreamingContext(sc, Milliseconds(batchDuration))
streamingContext.checkpoint(checkpointDir)
...
val kafkaWriter = SparkContext.getOrCreate().broadcast(kafkaErrorWriter)
...
val stateSpec = StateSpec.function((key: Int, value: Option[UserEvent], state: State[UserSession]) =>
updateUserEvents(key, value, state, kafkaWriter)).timeout(Minutes(jobConfig.getLong("timeoutInMinutes")))
kafkaTextStream
.transform(rdd => {
offsetsQueue.enqueue(rdd.asInstanceOf[HasOffsetRanges].offsetRanges)
rdd
})
.map(deserializeUserEvent)
.filter(_ != UserEvent.empty)
.mapWithState(stateSpec)
.foreachRDD { rdd =>
...
some logic
...
streamingContext
})
}
ssc.start()
ssc.awaitTermination()
def updateUserEvents(key: Int,
value: Option[UserEvent],
state: State[UserSession],
kafkaWriter: Broadcast[KafkaWriter]): Option[UserSession] = {
...
kafkaWriter.value.someMethodCall()
...
}
我收到以下错误时
kafkaWriter.value.someMethodCall()
被执行:
17/08/01 21:20:38 ERROR Executor: Exception in task 2.0 in stage 3.0 (TID 4)
java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to yuvalitzchakov.utils.KafkaWriter
at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$.updateUserSessions$1(SparkStatefulRunnerWithBroadcast.scala:144)
at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$.updateUserEvents(SparkStatefulRunnerWithBroadcast.scala:150)
at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$$anonfun$2.apply(SparkStatefulRunnerWithBroadcast.scala:78)
at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$$anonfun$2.apply(SparkStatefulRunnerWithBroadcast.scala:77)
at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
基本上kafkaWriter是广播变量,kafkaWriter.value应该返回我们广播的变量,但它返回的SerializableConiguration没有被强制转换为所需的对象
提前感谢您的帮助!