13

我正在尝试将 Spark Streaming 与 Kafka(版本 1.1.0)一起使用,但由于此错误,Spark 作业不断崩溃:

14/11/21 12:39:23 ERROR TaskSetManager: Task 3967.0:0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)

我从日志中获得的唯一相关信息是:

14/11/21 12:34:18 INFO MemoryStore: Block input-0-1416573258200 stored as bytes to memory (size 85.8 KB, free 2.3 GB)
14/11/21 12:34:18 INFO BlockManagerMaster: Updated info of block input-0-1416573258200
14/11/21 12:34:18 INFO BlockGenerator: Pushed block input-0-1416573258200
org.apache.spark.SparkException: Error sending message to BlockManagerMaster [message = GetLocations(input-0-1416573258200)]
java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
14/11/21 12:37:35 INFO BlockManagerInfo: Added input-0-1416573258200 in memory on ********:43117 (size: 85.8 KB, free: 2.3 GB)
org.apache.spark.SparkException: Error sending message to BlockManagerMaster [message = GetLocations(input-0-1416573258200)]
java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
java.lang.Exception: Could not compute split, block input-0-1416573258200 not found

示例代码:

SparkConf conf = new SparkConf();
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(5000));
jssc.checkpoint(checkpointDir);

HashMap<String, Integer> topics = new HashMap<String, Integer>();
topics.put(KAFKA_TOPIC, 1);

HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("group.id", "spark-streaming-test");
kafkaParams.put("zookeeper.connect", ZOOKEEPER_QUORUM);
kafkaParams.put("zookeeper.connection.timeout.ms", "1000");
kafkaParams.put("auto.offset.reset", "smallest");

JavaPairReceiverInputDStream<String, String> kafkaStream = 
  KafkaUtils.createStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics, StorageLevels.MEMORY_AND_DISK_SER);

JavaPairDStream<String, String> streamPair = kafkaStream.flatMapToPair(...).reduceByKey(...);

我不确定这个问题的原因是什么。

4

3 回答 3

3

检查以下内容。

1)您是否像在

def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // new context
    val lines = ssc.socketTextStream(...) // create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

您的初始化不正确。

看看下面

例如:recoverableNetworkCount App中的代码

2)您是否启用了属性预写日志“spark.streaming.receiver.writeAheadLog.enable”

3) 在 Streaming UI 中检查流的稳定性。处理时间<批处理间隔。

于 2015-12-08T09:22:56.083 回答
2

你试过吗inputs.persist(StorageLevel.MEMORY_AND_DISK_SER)

例如http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-td11186.html

于 2015-03-19T08:56:34.487 回答
1

这是由于 Spark 流模型。它收集批处理间隔的数据并将其发送到火花引擎进行处理。Spark 引擎不知道它来自流式系统,并且不会将其传回流式组件。

这意味着没有流控制(背压控制),不像 Storm 或 Flink 这样的原生流系统可以根据处理速率很好地平滑 spout/source 流。

来自https://spark.apache.org/docs/latest/streaming-programming-guide.html 火花流模型

解决此问题的一种选择是手动将处理信息/确认传递回接收器组件 - 当然这也意味着我们需要使用自定义接收器。在这一点上,我们开始构建 Storm/Flink 等提供的开箱即用的功能。

于 2018-12-05T22:08:17.193 回答