5

我正在运行 24X7 的 spark 流,并使用 updateStateByKey 函数来保存计算的历史数据,例如 NetworkWordCount 示例..

我试图流式传输具有 3lac 记录的文件,每 1500 条记录睡眠 1 秒。我正在使用 3 名工人

  1. 过一段时间updateStateByKey在增长,那么程序抛出如下异常

错误执行程序:任务 ID 1635 java.lang.ArrayIndexOutOfBoundsException 中的异常:3

14/10/23 21:20:43 ERROR TaskSetManager: Task 29170.0:2 failed 1 times; aborting job
14/10/23 21:20:43 ERROR DiskBlockManager: Exception while deleting local spark dir: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141023204346-b232
java.io.IOException: Failed to delete: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141023204346-b232/24

14/10/23 21:20:43 ERROR Executor: Exception in task ID 8037
java.io.FileNotFoundException: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141023204346-b232/22/shuffle_81_0_1 (No such file or directory)
    at java.io.FileOutputStream.open(Native Method)

如何处理?我想 updateStateByKey 应该随着它的快速增长而定期重置,请分享一些关于何时以及如何重置 updateStateByKey 的示例。或者我还有其他问题吗?一些启发。

任何帮助深表感谢。谢谢你的时间

4

1 回答 1

0

你设置了 CheckPoint ssc.checkpoint("path to checkpoint")

于 2015-04-23T11:37:27.150 回答