3

我正在运行 24/7 的火花流并使用 updateStateByKey 是否可以 24/7 运行火花流?如果是,updateStateByKey 不会变大,如何处理?当我们 24/7 运行时,我们是否必须定期重置/删除 updateStateByKey 如果不是如何以及何时重置它?还是 Spark 以分布式方式处理?如何动态地创建内存/存储。

当 updateStateByKey 增长时,我收到以下错误

Array out of bound exception

Exception while deleting local spark dir: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141026101251-cfb4
java.io.IOException: Failed to delete: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141026101251-cfb4

如何处理这个..如果有任何文档请指出我?我完全被卡住了,非常感谢您的帮助..感谢您的宝贵时间

4

3 回答 3

5

在 Java 中使用 Optional.absent() 和在 Scala 中使用 None 来删除键。工作示例可以在http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/找到。

于 2015-03-11T22:39:44.247 回答
0

使用 None 更新密钥会将其从 spark 中删除。如果你想保留key一定的时间,你可以给key附加一个过期时间,每批次检查一次。

例如,这是按分钟计算记录的代码:

val counts = lines.map(line => (currentMinute, 1))
val countsWithState = counts updateStateByKey { (values: Seq[Int], state: Option[Int]) =>
  if (values.isEmpty) { // every key will be iterated, even if there's no record in this batch
    println("values is empty")
    None // this will remove the key from spark
  } else {
    println("values size " + values.size)
    Some(state.sum + values.sum)
  }
}
于 2014-11-04T06:27:23.677 回答
0

pyspark : updateStateByKey(self, updateFunc, numPartitions=None, initialRDD=None)

返回一个新的“状态” DStream,其中每个键的状态通过对键的先前状态和键的新值应用给定函数来更新。

@param updateFunc:状态更新函数。如果这个函数返回None,那么对应的状态键值对将被淘汰。

updateFunc 方法返回 None,状态键值对删除;

于 2018-08-10T08:05:22.987 回答