0

我需要在HBase表上运行 MapReduce 以求和Double值。我按照HBase 文档中的示例进行操作。

这些属性最初是通过将字符串转换为字节数组(使用 HBase 的Bytes.toBytes(value))来存储的,但现在我需要获取它们Double以求和它们的值。对于只有正值的列,它给了我正确的总和,但我有一列也有一些负值(diferenca在下面的代码中调用)。

当我运行这项工作时,它给了我一个错误的答案,我注意到这就像 Reduce 任务正在获取数字模块或类似的东西。但是当我调试时,我看到解析的 Double 对象valor是正确的负数。我不知道这是什么原因造成的...

作业设置:

 job = new Job(hTable.getConfiguration(), "All");
            job.setJarByClass(HBaseQuery.class);
            scan.setCaching(500);
            scan.setCacheBlocks(false);
            TableMapReduceUtil.initTableMapperJob(
                    tabela,        // input table
                    scan,               
                    CalculoTotaisMapper.class,     // mapper class
                    Text.class,         // mapper output key
                    DoubleWritable.class,  // mapper output value
                    job);
            TableMapReduceUtil.initTableReducerJob(
                    tabela,        // output table
                    CalculoTotaisReducer.class,    // reducer class
                    job);
            job.setNumReduceTasks(1);   // at least one, adjust as required

            boolean b = job.waitForCompletion(true);

我的映射器:

public class CalculoTotaisMapper extends TableMapper<Text, DoubleWritable> {

    public static final String[] ATTRS = 
      new String[]{"valortotalprestador", "valortotalconvenio", "diferenca"};


    private Text text = new Text();
    private Logger logger = LoggerFactory.getLogger(CalculoTotaisMapper.class);

    public void map(ImmutableBytesWritable row, Result value, Context context)
        throws IOException, InterruptedException {

        Double val = 0.0;
        for (String attr : ATTRS) {
            byte[] valueBytes = value.getValue("hc".getBytes(), attr.getBytes());
            String valueString = Bytes.toString(valueBytes);

            val = Double.parseDouble(valueString);

            text.set(attr);

            context.write(text, new DoubleWritable(val));


        }

    }
}

我的减速机:

public class CalculoTotaisReducer extends TableReducer <Text, DoubleWritable,
                                                   ImmutableBytesWritable> {

    public static final byte[] CF = "qu".getBytes();

    public void reduce(Text key, Iterable<DoubleWritable> values,
       Context context) throws IOException, InterruptedException {
        double i = 0.0;
        for (DoubleWritable val : values) {
            double valor =  val.get();
           i += valor;
        }

        Put put = new Put(Bytes.toBytes("all"));
        put.add(CF, key.getBytes(), Bytes.toBytes(i));

        context.write(null, put);
    }

}
4

0 回答 0