我正在使用 R/spark 遍历许多 csv 数据文件。每个文件的大约 1% 必须保留(根据某些标准过滤)并与下一个数据文件合并(我使用过union
/ rbind
)。但是,随着循环的运行,数据的谱系会变得越来越长,因为 spark 会记住所有以前的数据集和filter()
-s。
有没有办法在 spark R API 中进行检查点?我了解到 spark 2.1 具有 DataFrames 的检查点,但这似乎不能从 R 中获得。
我正在使用 R/spark 遍历许多 csv 数据文件。每个文件的大约 1% 必须保留(根据某些标准过滤)并与下一个数据文件合并(我使用过union
/ rbind
)。但是,随着循环的运行,数据的谱系会变得越来越长,因为 spark 会记住所有以前的数据集和filter()
-s。
有没有办法在 spark R API 中进行检查点?我了解到 spark 2.1 具有 DataFrames 的检查点,但这似乎不能从 R 中获得。
我们在一个相当大的图(数十亿数据)和搜索连接组件上遇到了同样的问题。
我不确定 R 中有哪些适用于您的特定版本,但通常的解决方法是通过“保存”数据然后重新加载数据来打破沿袭。在我们的例子中,我们每 15 次迭代就打破血统:
def refreshGraph[VD: ClassTag, ED: ClassTag](g: Graph[VD, ED], checkpointDir: String, iterationCount: Int, numPartitions: Int): Graph[VD, ED] = {
val path = checkpointDir + "/iter-" + iterationCount
saveGraph(g, path)
g.unpersist()
loadGraph(path, numPartitions)
}
一个不完整的解决方案/解决方法是将collect()
您的数据框转换为 R 对象,然后重新并行化createDataFrame()
. 这适用于小数据,但对于较大的数据集,它变得太慢并且抱怨任务太大。