我正在运行 Spark 2.3.1 独立集群。我的工作是每 2 分钟从 Kafka 小批量中消耗一次,并将聚合写入某个商店。作业如下所示:
val stream = KafkaUtils.createDirectStream(...)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
stream.map(x=> Row(...))
.flatMap(r=>...)
.map(r=> (k,r))
.reduceByKey((r1, r2) => r)
.map { case (_, v) => v}
.foreachRDD { (rdd, time) => // write data}
这项工作还不错,运行了将近一周。但是后来,它堆积了 - 我看到批次正在运行,但在 Kafka 中开始出现滞后,并且偏移量也变得没有变化。
在日志中,有以下消息,我可以看到它们每 2 分钟重复一次(恕我直言,批处理间隔)。所以我认为这项工作是由于重试而挂起的。
[org.apache.spark.scheduler.TaskSetManager] Starting task 14.0 in stage 13042.0 (TID 7732814, 10.9.6.53, executor 5, partition 14, PROCESS_LOCAL, 7846 bytes)
[org.apache.spark.scheduler.TaskSetManager] Starting task 0.0 in stage 13042.0 (TID 7732815, 10.9.6.77, executor 3, partition 0, PROCESS_LOCAL, 7846 bytes)
[org.apache.spark.scheduler.TaskSetManager] Starting task 1.0 in stage 13042.0 (TID 7732816, 10.9.6.61, executor 2, partition 1, PROCESS_LOCAL, 7846 bytes)
[org.apache.spark.scheduler.TaskSetManager] Starting task 4.0 in stage 13042.0 (TID 7732812, 10.9.6.65, executor 1, partition 4, PROCESS_LOCAL, 7846 bytes)
[org.apache.spark.scheduler.TaskSetManager] Starting task 28.0 in stage 13042.0 (TID 7732813, 10.9.6.62, executor 4, partition 28, PROCESS_LOCAL, 7846 bytes)
[org.apache.spark.scheduler.TaskSchedulerImpl] Adding task set 13042.0 with 186 tasks
[org.apache.spark.scheduler.DAGScheduler] Submitting 186 missing tasks from ShuffleMapStage 13042 (MapPartitionsRDD[117358] at map at MyAgg.scala:373) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
重启解决了这个问题。
我的问题 - 任何想法为什么会发生这种情况,我可以做什么/配置以防止再次发生这种情况?