1

我有一个 Spark 作业,它从 HDFS 读取数百万条记录,处理它们,然后以 AVRO 格式写回 HDFS。观察到许多文件(写入)仍处于 .avro.tmp 状态。

我正在使用 Kite SDK 以 AVRO 格式写入数据。环境是CDH 5.5。

可能是因为 Spark 作业在完成读取记录并将它们发送给执行程序(实际上是在写入吗?)

如果是这种情况,我如何确保在所有 .tmp 都转换为 .avro 之前作业不会终止?或者还有什么可能的原因?

4

1 回答 1

0

在遍历所有记录后,我在 call() 方法本身中关闭了编写器后,它就可以工作了。这里的主要缺点是,对于每个分区,我都获得了一个新的写入器,需要找到更好的方法。

     df.toJavaRDD().foreachPartition(new VoidFunction<Iterator<Row>>() {

        @Override
        public void call(Iterator<Row> iterator) throws Exception {

            final DatasetWriter writer = // obtain writer

            while (iterator.hasNext()) {
                // process the records; write to HDFS using writer
            }

            writer.close(); // this ensures that .avro.tmp is converted to .avro
        }
    });
于 2016-02-05T04:28:25.063 回答