3

我正面临火花流作业的问题,我试图在火花中同时使用广播mapWithState检查点

以下是用法:

  • 由于我必须将一些连接对象(不可序列化)传递给执行程序,因此我使用的是org.apache.spark.broadcast.Broadcast
  • 由于我们必须维护一些缓存信息,因此我使用带有 mapWithState 的状态流
  • 我也在使用我的流上下文的检查点

我还需要将广播的连接对象传递到 mapWithState 中,以便从外部源获取一些数据。

当新创建上下文时,流程工作得很好。但是,当我使应用程序崩溃并尝试从检查点恢复时,我得到了 ClassCastException。

我根据asyncified.io 中的示例放置了一个小代码片段,以在github中重现该问题:

  • 我的广播逻辑是yuvalitzchakov.utils.KafkaWriter.scala
  • 应用程序的虚拟逻辑是yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast.scala

代码的虚拟片段:

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark-stateful-example")

...
val prop = new Properties()
...

val config: Config = ConfigFactory.parseString(prop.toString)
val sc = new SparkContext(sparkConf)
val ssc = StreamingContext.getOrCreate(checkpointDir, () =>  {

    println("creating context newly")

    clearCheckpoint(checkpointDir)

    val streamingContext = new StreamingContext(sc, Milliseconds(batchDuration))
    streamingContext.checkpoint(checkpointDir)

    ...
    val kafkaWriter = SparkContext.getOrCreate().broadcast(kafkaErrorWriter)
    ...
    val stateSpec = StateSpec.function((key: Int, value: Option[UserEvent], state: State[UserSession]) =>
        updateUserEvents(key, value, state, kafkaWriter)).timeout(Minutes(jobConfig.getLong("timeoutInMinutes")))

    kafkaTextStream
    .transform(rdd => {
        offsetsQueue.enqueue(rdd.asInstanceOf[HasOffsetRanges].offsetRanges)
        rdd
    })
    .map(deserializeUserEvent)
    .filter(_ != UserEvent.empty)
    .mapWithState(stateSpec)
    .foreachRDD { rdd =>
        ...
        some logic
        ...

    streamingContext
    })
}

ssc.start()
ssc.awaitTermination()


def updateUserEvents(key: Int,
                     value: Option[UserEvent],
                     state: State[UserSession],
                     kafkaWriter: Broadcast[KafkaWriter]): Option[UserSession] = {

    ...
    kafkaWriter.value.someMethodCall()
    ...
}

我收到以下错误时

kafkaWriter.value.someMethodCall()

被执行:

17/08/01 21:20:38 ERROR Executor: Exception in task 2.0 in stage 3.0 (TID 4)
java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to yuvalitzchakov.utils.KafkaWriter
    at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$.updateUserSessions$1(SparkStatefulRunnerWithBroadcast.scala:144)
    at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$.updateUserEvents(SparkStatefulRunnerWithBroadcast.scala:150)
    at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$$anonfun$2.apply(SparkStatefulRunnerWithBroadcast.scala:78)
    at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$$anonfun$2.apply(SparkStatefulRunnerWithBroadcast.scala:77)
    at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
    at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
    at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
    at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
    at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

基本上kafkaWriter是广播变量,kafkaWriter.value应该返回我们广播的变量,但它返回的SerializableConiguration没有被强制转换为所需的对象

提前感谢您的帮助!

4

1 回答 1

0

如果我们需要从 Spark 流中的检查点目录恢复,广播变量不能与 MapwithState(一般的转换操作)一起使用。在这种情况下,它只能在输出操作中使用,因为它需要 Spark 上下文来延迟初始化广播

class JavaWordBlacklist {

private static volatile Broadcast<List<String>> instance = null;

public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaWordBlacklist.class) {
if (instance == null)

{ List<String> wordBlacklist = Arrays.asList("a", "b", "c"); instance = jsc.broadcast(wordBlacklist); }

}
}
return instance;
}
}

class JavaDroppedWordsCounter {

private static volatile LongAccumulator instance = null;

public static LongAccumulator getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaDroppedWordsCounter.class) {
if (instance == null)

{ instance = jsc.sc().longAccumulator("WordsInBlacklistCounter"); }

}
}
return instance;
}
}

wordCounts.foreachRDD((rdd, time) -> {
// Get or register the blacklist Broadcast
Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
// Get or register the droppedWordsCounter Accumulator
LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
// Use blacklist to drop words and use droppedWordsCounter to count them
String counts = rdd.filter(wordCount -> {
if (blacklist.value().contains(wordCount._1()))

{ droppedWordsCounter.add(wordCount._2()); return false; }

else

{ return true; }

}).collect().toString();
String output = "Counts at time " + time + " " + counts;
}
于 2018-03-04T15:49:17.927 回答