0

我正在我的地图函数中生成一个 csv 文件。这样每个地图任务都会生成一个 csv 文件。现在这是一个副作用,而不是映射器的输出。我命名这些文件的方式类似于 filename_inputkey。但是,当我在单节点集群上运行应用程序时,只生成了一个文件。我的输入中有 10 行,根据我的理解,将有 10 个映射器任务并生成 10 个文件。让我知道我是否在这里以错误的方式思考。

这是我的 GWASInputFormat 类

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;

public class GWASInputFormat extends FileInputFormat<LongWritable, GWASGenotypeBean>{

@Override
public RecordReader<LongWritable, GWASGenotypeBean> getRecordReader(org.apache.hadoop.mapred.InputSplit input, JobConf job, Reporter arg2) throws IOException {
    return (RecordReader<LongWritable, GWASGenotypeBean>) new GWASRecordReader(job, (FileSplit)input);
}

}

这是GWASRecordReader

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;

public class GWASRecordReader implements RecordReader<LongWritable, GWASGenotypeBean>{

private LineRecordReader lineReader;
private LongWritable lineKey;
private Text lineValue;

@Override
public void close() throws IOException {
    if(lineReader != null) {
        lineReader.close();
    }
}

public GWASRecordReader(JobConf job, FileSplit split) throws IOException {
    lineReader = new LineRecordReader(job, split);
    lineKey = lineReader.createKey();
    lineValue = lineReader.createValue();
}

@Override
public LongWritable createKey() {
    return new LongWritable();
}

@Override
public GWASGenotypeBean createValue() {
    return new GWASGenotypeBean();
}

@Override
public long getPos() throws IOException {
    return lineReader.getPos();
}

@Override
public boolean next(LongWritable key, GWASGenotypeBean value) throws IOException {
    if(!lineReader.next(lineKey, lineValue)){
        return false;
    }

    String[] values = lineValue.toString().split(",");

    if(values.length !=32) {
        throw new IOException("Invalid Record ");
    }

    value.setPROJECT_NAME(values[0]);
    value.setRESEARCH_CODE(values[1]);
    value.setFACILITY_CODE(values[2]);
    value.setPROJECT_CODE(values[3]);
    value.setINVESTIGATOR(values[4]);
    value.setPATIENT_NUMBER(values[5]);
    value.setSAMPLE_COLLECTION_DATE(values[6]);
    value.setGENE_NAME(values[7]);
    value.setDbSNP_RefSNP_ID(values[8]);
    value.setSNP_ID(values[9]);
    value.setALT_SNP_ID(values[10]);
    value.setSTRAND(values[11]);
    value.setASSAY_PLATFORM(values[12]);
    value.setSOFTWARE_NAME(values[13]);
    value.setSOFTWARE_VERSION_NUMBER(values[14]);
    value.setTEST_DATE(values[15]);
    value.setPLATE_POSITION(values[16]);
    value.setPLATE_ID(values[17]);
    value.setOPERATOR(values[18]);
    value.setGENOTYPE(values[19]);
    value.setGENOTYPE_QS1_NAME(values[20]);
    value.setGENOTYPE_QS2_NAME(values[21]);
    value.setGENOTYPE_QS3_NAME(values[22]);
    value.setGENOTYPE_QS4_NAME(values[23]);
    value.setGENOTYPE_QS5_NAME(values[24]);
    value.setGENOTYPE_QS1_RESULT(values[25]);
    value.setGENOTYPE_QS2_RESULT(values[26]);
    value.setGENOTYPE_QS3_RESULT(values[27]);
    value.setGENOTYPE_QS4_RESULT(values[28]);
    value.setGENOTYPE_QS5_RESULT(values[29]);
    value.setSTAGE(values[30]);
    value.setLAB(values[31]);
    return true;
}

@Override
public float getProgress() throws IOException {
    return lineReader.getProgress();
}

}

映射器类

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

import com.google.common.base.Strings;

public class GWASMapper extends MapReduceBase implements Mapper<LongWritable, GWASGenotypeBean, Text, Text> {

private static Configuration conf;

@SuppressWarnings("rawtypes")
public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException {
    conf = context.getConfiguration();
    // Path[] otherFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
}


@Override
public void map(LongWritable inputKey, GWASGenotypeBean inputValue, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {



    checkForNulls(inputValue, inputKey.toString());




    output.collect(new Text(inputValue.getPROJECT_CODE()), new Text(inputValue.getFACILITY_CODE()));

}

private void checkForNulls(GWASGenotypeBean user, String inputKey) {

    String f1 = " does not have a value_fail";
    String p1 = "Must not contain NULLS for required fields";
    // have to initialize these two to some paths in hdfs

    String edtChkRptDtl = "/user/hduser/output6/detail" + inputKey + ".csv";
    String edtChkRptSmry = "/user/hduser/output6/summary" + inputKey + ".csv";
            ../

            List<String> errSmry = new ArrayList<String>();
    Map<String, String> loc = new TreeMap<String, String>();

    if(Strings.isNullOrEmpty(user.getPROJECT_NAME())) {
        loc.put("test", "PROJECT_NAME ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getRESEARCH_CODE())) {
        loc.put("test", "RESEARCH_CODE ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getFACILITY_CODE())) {
        loc.put("test", "FACILITY_CODE ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getPROJECT_CODE())) {
        loc.put("test", "PROJECT_CODE ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getINVESTIGATOR())) {
        loc.put("test", "INVESTIGATOR ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getPATIENT_NUMBER())) {
        loc.put("test", "PATIENT_NUMBER ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getSAMPLE_COLLECTION_DATE())) {
        loc.put("test", "SAMPLE_COLLECTION_DATE ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getGENE_NAME())) {
        loc.put("test", "GENE_NAME ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getSTRAND())) {
        loc.put("test", "STRAND ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getASSAY_PLATFORM())) {
        loc.put("test", "ASSAY_PLATFORM ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getSOFTWARE_NAME())) {
        loc.put("test", "SOFTWARE_NAME ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getTEST_DATE())) {
        loc.put("test", "TEST_DATE ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getPLATE_POSITION())) {
        loc.put("test", "PLATE_POSITION ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getPLATE_ID())) {
        loc.put("test", "PLATE_ID ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getOPERATOR())) {
        loc.put("test", "OPERATOR ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getGENOTYPE())) {
        loc.put("test", "GENOTYPE ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getSTAGE())) {
        loc.put("test", "STAGE ");
        errSmry.add("_fail");
    } else if(Strings.isNullOrEmpty(user.getLAB())) {
        loc.put("test", "LAB ");
        errSmry.add("_fail");
    }

    String customNullMsg = "Required Genotype column(s)";
    List<String> error = new ArrayList<String>();
    String message = null;

    if(!loc.isEmpty()) {
        for (Map.Entry<String, String> entry : loc.entrySet()) {
        message = "line:" + entry.getKey() + " column:" + entry.getValue() + " " + f1;
        error.add(message);
        }
    } else {
        message = "_pass";
        error.add(message);
    }

    int cnt = 0;
    if(!errSmry.isEmpty()) {

        // not able to understand this. Are we trying to get the occurances
        // if the last key that contains _fail
        for (String key : errSmry) {
        if(key.contains("_fail")) {
            cnt = Collections.frequency(errSmry, key);
            // ******************** Nikhil added this
            break;
        }
        }

        if(cnt > 0) {
        writeCsvFileSmry(edtChkRptSmry, customNullMsg, p1, "failed", Integer.toString(cnt));
        } else {
        writeCsvFileSmry(edtChkRptSmry, customNullMsg, p1, "passed", "0");
        }

    } else {
        writeCsvFileSmry(edtChkRptSmry, customNullMsg, p1, "passed", "0");
    }

    // loop the list and write out items to the error report file
    if(!error.isEmpty()) {
        for (String s : error) {
        //System.out.println(s);
        if(s.contains("_fail")) {
            String updatedFailmsg = s.replace("_fail", "");
            writeCsvFileDtl(edtChkRptDtl, "genotype", updatedFailmsg, "failed");
        }
        if(s.contains("_pass")) {
            writeCsvFileDtl(edtChkRptDtl, "genotype", p1, "passed");
        }
        }
    } else {
        writeCsvFileDtl(edtChkRptDtl, "genotype", p1, "passed");
    }
    // end loop
   }

 private void writeCsvFileDtl(String edtChkRptDtl, String col1, String col2, String col3) {
    try {
        if(conf == null) {
            conf = new Configuration();
        }
        FileSystem fs = FileSystem.get(conf);

        Path path = new Path(edtChkRptDtl);
        if (!fs.exists(path)) {
            FSDataOutputStream out = fs.create(path);
            out.writeChars(col1);
            out.writeChar(',');
            out.writeChars(col2);
            out.writeChar(',');
            out.writeChars(col3);
            out.writeChar('\n');
            out.flush();
            out.close();
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

private void writeCsvFileSmry(String edtChkRptSmry, String col1, String col2, String col3, String col4) {
    try {


        if(conf == null) {
            conf = new Configuration();
        }
        FileSystem fs = FileSystem.get(conf);

        Path path = new Path(edtChkRptSmry);
        if (!fs.exists(path)) {
            FSDataOutputStream out = fs.create(path);
            out.writeChars(col1);
            out.writeChar(',');
            out.writeChars(col2);
            out.writeChar(',');
            out.writeChars(col3);
            out.writeChar(',');
            out.writeChars(col4);
            out.writeChar('\n');
            out.flush();
            out.close();
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}
}

这是我的司机课

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class GWASMapReduce extends Configured implements Tool{

/**
 * @param args
 */
public static void main(String[] args) throws Exception {
    Configuration configuration = new Configuration();
    ToolRunner.run(configuration, new GWASMapReduce(), args);
}

@Override
public int run(String[] arg0) throws Exception {

    JobConf conf = new JobConf(new Configuration());
    conf.setInputFormat(GWASInputFormat.class);
    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(Text.class);
    conf.setJarByClass(GWASMapReduce.class);
    conf.setMapperClass(GWASMapper.class);
    conf.setNumReduceTasks(0);
    FileInputFormat.addInputPath(conf, new Path(arg0[0]));
    FileOutputFormat.setOutputPath(conf, new Path(arg0[1]));
    JobClient.runJob(conf);
    return 0;
}
}
4

1 回答 1

1

可能只有一个 Mapper 任务,十次调用它的map方法。如果你希望每个 Mapper 写出一个文件,你应该在它的configure方法中这样做。如果您希望为每个输入记录写出一个文件,您应该在其map方法中这样做。

编辑:以上结果与问题无关。问题是在 GWASInputFormat 中,您没有在 next 方法中设置键,因此您的地图输入键始终相同。只需添加key.set(lineKey.get());next方法中,它应该可以工作。

于 2012-11-05T22:21:47.610 回答