我正在尝试在集群上运行一个非常大的 RDD 并将其写入 .csv。它是如此之大以至于 .collect() 中断,所以我想将 RDD 保存到每个节点上的片段中,然后以某种方式将它们组合在一起,因为顺序无关紧要。我的想法是将 foreach 与 CSV 打印机功能一起使用,以便每个部分都写入它的值,然后我可以手动将这些部分收集在一起,也许是通过 FTP。
我是一个有一定经验的 Spark 用户,但到目前为止,我从来没有能够让 RDD 的 foreach 方法做任何有用的事情。当我尝试运行文档中给出的示例时,
>>> def f(x): print x
>>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
我的控制台上什么也没有。我相信这是因为“打印”是在单独的节点上执行的,而不是控制台所在的名称节点。不过,在那种情况下,我真的看不出 foreach 函数有什么意义!
如何在不首先调用 collect() 函数的情况下将我的 for each 的结果返回到名称节点?
注意。我也愿意使用 saveAsTextFile() RDD 函数,但我还是无法让它工作!它似乎创建了一个文件夹而不是文本文件,尽管这可能是因为它们也存在于每个节点上而不是集中存在?