1

我正在使用 R/spark 遍历许多 csv 数据文件。每个文件的大约 1% 必须保留(根据某些标准过滤)并与下一个数据文件合并(我使用过union/ rbind)。但是,随着循环的运行,数据的谱系会变得越来越长,因为 spark 会记住所有以前的数据集和filter()-s。

有没有办法在 spark R API 中进行检查点?我了解到 spark 2.1 具有 DataFrames 的检查点,但这似乎不能从 R 中获得。

4

2 回答 2

1

我们在一个相当大的图(数十亿数据)和搜索连接组件上遇到了同样的问题。

我不确定 R 中有哪些适用于您的特定版本,但通常的解决方法是通过“保存”数据然后重新加载数据来打破沿袭。在我们的例子中,我们每 15 次迭代就打破血统:

def refreshGraph[VD: ClassTag, ED: ClassTag](g: Graph[VD, ED], checkpointDir: String, iterationCount: Int, numPartitions: Int): Graph[VD, ED] = {
    val path = checkpointDir + "/iter-" + iterationCount
    saveGraph(g, path)
    g.unpersist()
    loadGraph(path, numPartitions)
}
于 2017-03-15T00:44:34.610 回答
0

一个不完整的解决方案/解决方法是将collect()您的数据框转换为 R 对象,然后重新并行化createDataFrame(). 这适用于小数据,但对于较大的数据集,它变得太慢并且抱怨任务太大。

于 2017-03-15T16:44:44.273 回答