如果在 GCS Bucket 中同步,Data Fusion Pipeline 会在输出时为我们提供一个或多个部分文件。我的问题是我们如何将这些零件文件合并为一个并给它们起一个有意义的名称?
问问题
183 次
1 回答
2
Data Fusion
转换在执行或作业的Dataproc
集群中运行。您的最终输出被拆分为多个文件,因为作业基于 HDFS 分区对您的数据进行分区(这是 Spark/Hadoop 的默认行为)。Spark
MapReduce
在编写 Spark 脚本时,您可以操纵这种默认行为并产生不同的输出。但是,Data Fusion 旨在抽象代码层并为您提供使用完全托管的数据集成器的体验。使用拆分文件应该不是问题,但是如果您确实需要合并它们,我建议您使用以下方法:
在 Pipeline Studio 顶部单击Hub -> Plugins,搜索Dynamic Spark Plugin,单击Deploy然后在Finish(您可以忽略 JAR 文件)
返回到您的管道,在接收器部分选择Spark。
用Spark插件替换您的GCS插件
在您的 Spark 插件中,将Compile at Deployment Time设置为false并将代码替换为一些您想要的 Spark 代码。例如,下面的代码是硬编码的,但可以工作:
def sink(df: DataFrame) : Unit = { new_df = df.coalesce(1) new_df.write.format("csv").save("gs://your/path/") }
此函数从您的管道接收数据作为Dataframe。coalesce函数将分区数减少到 1,最后一行将其写入 GCS。
部署您的管道,它将准备好运行
于 2020-09-01T13:50:13.340 回答