1

我需要根据过滤条件将输入文件拆分为 2 个输出文件。我的输出目录应如下所示:

/hdfs/base/dir/matched/YYYY/MM/DD
/hdfs/base/dir/notmatched/YYYY/MM/DD

我正在使用MultipleOutputs类在我的地图函数中拆分我的数据。在我的驱动程序类中,我使用如下:

FileOutputFormat.setOutputPath(job, new Path("/hdfs/base/dir"));

在 Mapper 中,我在下面使用:

mos.write(key, value, fileName); // File Name is generating based on filter criteria

该程序可以正常运行一天。但是在第二天我的程序没有说:

Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://nameservice1/hdfs/base/dir already exists

第二天我不能使用不同的基本目录。

我该如何处理这种情况?

注意:我不想读取输入以创建 2 个单独的文件。

4

2 回答 2

1

创建自定义 o/p 格式类,如下所示

package com.visa.util;

import java.io.IOException;

import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class CostomOutputFormat<K, V> extends SequenceFileOutputFormat<K, V>{

    @Override
    public void checkOutputSpecs(JobContext arg0) throws IOException {
    }

    @Override
    public OutputCommitter getOutputCommitter(TaskAttemptContext arg0) throws IOException {
        return super.getOutputCommitter(arg0);
    }

    @Override
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext arg0) throws IOException, InterruptedException {
        return super.getRecordWriter(arg0);
    }

}

并在驱动程序类中使用它:

job.setOutputFormatClass(CostomOutputFormat.class);

这将跳过检查 o/p 目录是否存在。

于 2015-04-08T07:54:54.010 回答
-1

您可以在输出值中有一个标志列。稍后您可以处理输出并按标志列拆分它。

于 2015-04-08T07:02:04.070 回答