2

我正在通过 Scala API 使用 apache flink,并且在某些时候我获得了 DataSet[(Int, Int, Int)]. 使用这些方法的结果 writeAsCSV()writeAsText()出乎意料的。它创建一个目录。该目录具有方法调用的第一个参数作为位置和名称(例如 filePath 。)在该目录中出现两个文件,名称为“1”和“2”。在这些文件中,我可以看到 DataSets 数据。他们似乎将 DataSets 内容划分为这两个文件。尝试重新创建此行为以显示更简洁的代码片段,我无法做到。那就是我目睹了在预期位置创建了一个具有预期名称的文件,并且没有创建目录。val mas = ma_ groupBy(0,1) sum(2) mas.writeAsCsv("c:\flink\mas.csv" )

导致创建一个名为“mas.csv”的目录和其中的两个文件“1”和“2”。什么时候会发生这样的事情?使用了 flink 9.1 本地模式,Windows 7,scala 2.10,eclipse3.0.3

4

1 回答 1

6

这是预期的行为。如果要获取单个输出文件,则需要将接收器的并行度设置为 1。

dataset = dataset.writeAsCsv("filename").setParallelism(1);

对于 DataStream API,你需要插入一个额外rebalane()的来打破算子链。否则,整个链将以 dop=1 执行或setParallelism()可能被忽略。

datastream = datastream.rebalance().writeAsCsv("filename").setParallelism(1);
于 2015-09-15T08:56:57.923 回答