3

我编写了一个与 updateStateByKey 一起使用的简单函数,以查看问题是否是由于我的 updateFunc 引起的。我想这一定是由于其他原因。我在 --master local[4] 上运行它。

val updateFunc = (values: Seq[Int], state: Option[Int]) => {
  Some(1)
}

val state = test.updateStateByKey[Int](updateFunc)

一段时间后,有警告,并且任务大小不断增加。

WARN TaskSetManager:Stage x 包含一个非常大的任务(129 KB)。建议的最大任务大小为 100 KB。

WARN TaskSetManager:Stage x 包含一个非常大的任务(131 KB)。建议的最大任务大小为 100 KB。

4

1 回答 1

0

您的流中有越来越多不同的键,每个键都会导致将新副本1添加到您的状态。

当前 updateStateByKey 扫描每个批处理间隔中的每个键,即使该键没有数据。这会导致 updateStateByKey 的批处理时间随着状态中键的数量而增加,即使数据速率保持固定

有一个解决这个问题的建议

于 2015-11-07T09:47:34.277 回答