0

我在很短的时间内就开始使用 Hadoop 并尝试在 Java 中实现连接。不管是 Map-Side 还是 Reduce-Side。我采用了 Reduce-Side join,因为它应该更容易实现。我知道 Java 不是连接、聚合等的最佳选择,最好选择我已经做过的 Hive 或 Pig。但是,我正在做一个研究项目,我必须使用所有这 3 种语言才能进行比较。

无论如何,我有两个不同结构的输入文件。一个是 key|value,另一个是 key|value1;value2;value3;value4。每个输入文件的一条记录如下所示:

  • 输入1:1;2010-01-10T00:00:01
  • 输入2:1;23;Blue;2010-01-11T00:00:01;9999-12-31T23:59:59

我遵循了 Hadoop Definitve Guide book 中的示例,但它对我不起作用。我在这里发布了我的 java 文件,所以你可以看到有什么问题。

public class LookupReducer extends Reducer<TextPair,Text,Text,Text> {


private String result = "";
private String msisdn;
private String attribute, product;
private long trans_dt_long, start_dt_long, end_dt_long; 
private String trans_dt, start_dt, end_dt; 

@Override
public void reduce(TextPair key, Iterable<Text> values, Context context) 
        throws IOException, InterruptedException {

     context.progress();
    //value without key to remember

    Iterator<Text> iter =  values.iterator();

 for (Text val : values) {

Text recordNoKey = val;     //new Text(iter.next());

String valSplitted[] = recordNoKey.toString().split(";"); 

//if the input is coming from CDR set corresponding values

    if(key.getSecond().toString().equals(CDR.CDR_TAG))
    {
        trans_dt = recordNoKey.toString();
        trans_dt_long = dateToLong(recordNoKey.toString());
    }
  //if the input is coming from Attributes set corresponding values
    else if(key.getSecond().toString().equals(Attribute.ATT_TAG))
    {
        attribute = valSplitted[0];
        product = valSplitted[1];
        start_dt = valSplitted[2];
        start_dt_long = dateToLong(valSplitted[2]);
        end_dt = valSplitted[3];
        end_dt_long = dateToLong(valSplitted[3]);;
    }

        Text record = val;  //iter.next();
        //System.out.println("RECORD: " + record);
        Text outValue = new Text(recordNoKey.toString() + ";" + record.toString());     

if(start_dt_long < trans_dt_long && trans_dt_long < end_dt_long)
       {
    //concat output columns
        result = attribute + ";" + product + ";" + trans_dt;    

    context.write(key.getFirst(), new Text(result));
    System.out.println("KEY: " + key);
        }
    }
}

private static long dateToLong(String date){
    DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    Date parsedDate = null;
    try {
        parsedDate = formatter.parse(date);
    } catch (ParseException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    long dateInLong = parsedDate.getTime();

    return dateInLong;

}

public static class TextPair implements WritableComparable<TextPair> {

    private Text first;
    private Text second;

    public TextPair(){
        set(new Text(), new Text());
    }

    public TextPair(String first, String second){
        set(new Text(first), new Text(second));
    }

    public TextPair(Text first, Text second){
        set(first, second);
    }

    public void set(Text first, Text second){
        this.first = first;
        this.second = second;
    }

    public Text getFirst() {
        return first;
    }

    public void setFirst(Text first) {
        this.first = first;
    }

    public Text getSecond() {
        return second;
    }

    public void setSecond(Text second) {
        this.second = second;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        first.readFields(in);
        second.readFields(in);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        first.write(out);
        second.write(out);
    }

    @Override
    public int hashCode(){
        return first.hashCode() * 163 + second.hashCode();
    }

    @Override
    public boolean equals(Object o){
        if(o instanceof TextPair)
        {
            TextPair tp = (TextPair) o;
            return first.equals(tp.first) && second.equals(tp.second);
        }
        return false;
    }

    @Override
    public String toString(){
        return first + ";" + second;
    }

    @Override
    public int compareTo(TextPair tp) {
        // TODO Auto-generated method stub
        int cmp = first.compareTo(tp.first);
        if(cmp != 0)
            return cmp;
        return second.compareTo(tp.second);
    }


    public static class FirstComparator extends WritableComparator {

        protected FirstComparator(){
            super(TextPair.class, true);
        }

        @Override
        public int compare(WritableComparable comp1, WritableComparable comp2){
            TextPair pair1 = (TextPair) comp1;
            TextPair pair2 = (TextPair) comp2;
            int cmp = pair1.getFirst().compareTo(pair2.getFirst());

            if(cmp != 0)
                return cmp;

            return -pair1.getSecond().compareTo(pair2.getSecond());
        }
    }

    public static class GroupComparator extends WritableComparator {
        protected GroupComparator() 
        {
            super(TextPair.class, true);
        }

        @Override
        public int compare(WritableComparable comp1, WritableComparable comp2)
        {
            TextPair pair1 =  (TextPair) comp1;
            TextPair pair2 =  (TextPair) comp2;

            return pair1.compareTo(pair2);
        }
    }

}

}

public class Joiner  extends Configured implements Tool {

public static final String DATA_SEPERATOR =";";                                      //Define the symbol for seperating the output data
public static final String NUMBER_OF_REDUCER = "1";                                  //Define the number of the used reducer jobs
public static final String COMPRESS_MAP_OUTPUT = "true";                             //if the output from the mapping process should be compressed, set COMPRESS_MAP_OUTPUT = "true" (if not set it to "false")
public static final String 
            USED_COMPRESSION_CODEC = "org.apache.hadoop.io.compress.SnappyCodec";    //set the used codec for the data compression
public static final boolean JOB_RUNNING_LOCAL = true;                                //if you run the Hadoop job on your local machine, you have to set JOB_RUNNING_LOCAL = true
                                                                                     //if you run the Hadoop job on the Telefonica Cloud, you have to set JOB_RUNNING_LOCAL = false
public static final String OUTPUT_PATH = "/home/hduser"; //set the folder, where the output is saved. Only needed, if JOB_RUNNING_LOCAL = false



public static class KeyPartitioner extends Partitioner<TextPair, Text> {
    @Override
    public int getPartition(/*[*/TextPair key/*]*/, Text value, int numPartitions) {
        System.out.println("numPartitions: " + numPartitions);
          return (/*[*/key.getFirst().hashCode()/*]*/ & Integer.MAX_VALUE) % numPartitions;
        }
}

private static Configuration hadoopconfig() {
    Configuration conf = new Configuration();

    conf.set("mapred.textoutputformat.separator", DATA_SEPERATOR);
    conf.set("mapred.compress.map.output", COMPRESS_MAP_OUTPUT);
    //conf.set("mapred.map.output.compression.codec", USED_COMPRESSION_CODEC);
    conf.set("mapred.reduce.tasks", NUMBER_OF_REDUCER);
    return conf;
}

@Override
public int run(String[] args) throws Exception {
    // TODO Auto-generated method stub
    if ((args.length != 3) && (JOB_RUNNING_LOCAL)) {

        System.err.println("Usage: Lookup <CDR-inputPath> <Attribute-inputPath> <outputPath>");
        System.exit(2);
    }

    //starting the Hadoop job
    Configuration conf = hadoopconfig();
    Job job = new Job(conf, "Join cdrs and attributes");
    job.setJarByClass(Joiner.class);

    MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, CDRMapper.class);
    MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, AttributeMapper.class);
    //FileInputFormat.addInputPath(job, new Path(otherArgs[0]));    //expecting a folder instead of a file

    if(JOB_RUNNING_LOCAL)
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
    else
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));


    job.setPartitionerClass(KeyPartitioner.class);
    job.setGroupingComparatorClass(TextPair.FirstComparator.class);
    job.setReducerClass(LookupReducer.class);

    job.setMapOutputKeyClass(TextPair.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    return job.waitForCompletion(true) ? 0 : 1;
}

 public static void main(String[] args) throws Exception {

     int exitCode = ToolRunner.run(new Joiner(), args);
     System.exit(exitCode);

 }
}

public class Attribute {

public static final String ATT_TAG = "1";


public static class AttributeMapper 
extends Mapper<LongWritable, Text, TextPair, Text>{

    private static Text values = new Text();
    //private Object output = new Text();

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //partition the input line by the separator semicolon   
        String[] attributes = value.toString().split(";");
        String valuesInString = "";

        if(attributes.length != 5)
            System.err.println("Input column number not correct. Expected 5, provided " + attributes.length
                    + "\n" + "Check the input file");
        if(attributes.length == 5)
        {
            //setting the values with the input values read above
            valuesInString = attributes[1] + ";" + attributes[2] + ";" + attributes[3] + ";" + attributes[4];
            values.set(valuesInString);
        //writing out the key and value pair
        context.write( new TextPair(new Text(String.valueOf(attributes[0])), new Text(ATT_TAG)), values);
            }
    }
}   

}

public class CDR    {


public static final String CDR_TAG = "0";

 public static class CDRMapper 
    extends Mapper<LongWritable, Text, TextPair, Text>{

        private static Text values = new Text();
        private Object output = new Text();

    @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //partition the input line by the separator semicolon   
    String[] cdr = value.toString().split(";");

    //setting the values with the input values read above
    values.set(cdr[1]);
    //output = CDR_TAG + cdr[1];

    //writing out the key and value pair
    context.write( new TextPair(new Text(String.valueOf(cdr[0])), new Text(CDR_TAG)), values);
        }


     }

}

如果有人至少可以发布教程链接或实现此类连接功能的简单示例,我将很高兴。我搜索了很多,但要么代码不完整,要么没有足够的解释。

4

1 回答 1

3

老实说,我不知道您的代码要做什么,但这可能是因为我会以不同的方式进行操作,并且不熟悉您正在使用的 API。

我将从头开始如下:

  • 创建一个映射器来读取其中一个文件。对于读取的每一行,将键值对写入上下文。键是从键创建的文本,值是通过将“1”与整个输入记录连接而创建的另一个文本。
  • 为另一个文件创建另一个映射器。此映射器的作用与第一个映射器类似,但其值是通过将“2”与整个输入记录连接起来创建的文本。
  • 写一个reducer来做join。reduce() 方法将获取为特定键写入的所有记录。您可以通过查看值是以“1”还是“2”开头来判断哪个输入文件(以及记录的数据格式)。一旦您知道您是否拥有一种、另一种或两种记录类型,您就可以编写合并来自一个或两个记录的数据所需的任何逻辑。

顺便说一句,您使用 MultipleInputs 类在您的作业/驱动程序类中配置多个映射器。

于 2012-12-07T17:44:13.783 回答