0

`public class GenericUdafMemberLevel implements GenericUDAFResolver2 { private static final Log LOG = LogFactory .getLog(GenericUdafMemberLevel.class.getName());

@Override
public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo paramInfo)
        throws SemanticException {
    return new GenericUdafMeberLevelEvaluator();
}

@Override
//参数校验
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
        throws SemanticException {
    if (parameters.length != 2) {//参数大小
        throw new UDFArgumentTypeException(parameters.length - 1,
                "Exactly two arguments are expected.");
    }
    //参数必须是原型,即不能是
    if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
        throw new UDFArgumentTypeException(0,
                "Only primitive type arguments are accepted but "
                        + parameters[0].getTypeName() + " is passed.");
    }

    if (parameters[1].getCategory() != ObjectInspector.Category.PRIMITIVE) {
        throw new UDFArgumentTypeException(1,
                "Only primitive type arguments are accepted but "
                        + parameters[1].getTypeName() + " is passed.");
    }

     return new GenericUdafMeberLevelEvaluator();
}

public static class GenericUdafMeberLevelEvaluator extends GenericUDAFEvaluator {
    private PrimitiveObjectInspector inputOI;
    private PrimitiveObjectInspector inputOI2;
    private DoubleWritable result;

    @Override
    public ObjectInspector init(Mode m, ObjectInspector[] parameters)
            throws HiveException {
        super.init(m, parameters);
        if (m == Mode.PARTIAL1 || m == Mode.COMPLETE){
            inputOI = (PrimitiveObjectInspector) parameters[0];
            inputOI2 = (PrimitiveObjectInspector) parameters[1];
            result = new DoubleWritable(0);
        }
        return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
    }

    /** class for storing count value. */
    static class SumAgg implements AggregationBuffer {
        boolean empty;
        double value;
    }

    @Override
    //创建新的聚合计算的需要的内存,用来存储mapper,combiner,reducer运算过程中的相加总和。
    //使用buffer对象前,先进行内存的清空——reset
    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
        SumAgg buffer = new SumAgg();
        reset(buffer);
        return buffer;
    }

    @Override
    //重置为0
    //mapreduce支持mapper和reducer的重用,所以为了兼容,也需要做内存的重用。
    public void reset(AggregationBuffer agg) throws HiveException {
        ((SumAgg) agg).value = 0.0;
        ((SumAgg) agg).empty = true;
    }

    private boolean warned = false;
    //迭代
    //map阶段调用,只要把保存当前和的对象agg,再加上输入的参数,就可以了。
    @Override
    public void iterate(AggregationBuffer agg, Object[] parameters)
            throws HiveException {
        // parameters == null means the input table/split is empty
        if (parameters == null) {
            return;
        }
        try {
            double flag = PrimitiveObjectInspectorUtils.getDouble(parameters[1], inputOI2);
            if(flag > 1.0)   //参数条件
                merge(agg, parameters[0]);   //这里将Map之后的操作,放入combiner进行合并
          } catch (NumberFormatException e) {
            if (!warned) {
              warned = true;
              LOG.warn(getClass().getSimpleName() + " "
                  + StringUtils.stringifyException(e));
            }
          }

    }

    @Override
    //combiner合并map返回的结果,还有reducer合并mapper或combiner返回的结果。
    public void merge(AggregationBuffer agg, Object partial)
            throws HiveException {
        if (partial != null) {
            //通过ObejctInspector取每一个字段的数据
            double p = PrimitiveObjectInspectorUtils.getDouble(partial, inputOI);
            ((SumAgg) agg).value += p;
        }
    }


    @Override
    //reducer返回结果,或者是只有mapper,没有reducer时,在mapper端返回结果。
    public Object terminatePartial(AggregationBuffer agg)
            throws HiveException {
        return terminate(agg);
    }

    @Override
    public Object terminate(AggregationBuffer agg) throws HiveException {
        result.set(((SumAgg) agg).value);
        return result;
    }
}

}`

为了理解理论,我使用了一些中文注释代码。实际上,UDAF 的思路如下: select test_sum(col1,col2) from tbl ; 如果 col2 满足某些条件,则求和 col1 的值。大部分代码都是从官方的 avg() udaf 函数复制而来的。

我遇到了一个疲倦的异常: java.lang.RuntimeException: Hive Runtime Error while closing operators at org.apache.hadoop.hive.ql.exec.ExecMapper.close(ExecMapper.java:226) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372) at org.apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1136) at org.apache.hadoop.mapred.Child.main(Child.java:249) Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.DoubleWritable cannot be cast to org.apache.hadoop.io.LongWritable at org.apache.hadoop.hive.ql.exec.GroupByOperator.closeOp(GroupByOperator.java:1132) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:558) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567) at org.apache.hadoop.hive.ql.exec.ExecMapper.close(ExecMapper.java:193) ... 8 more Caused by: java.lang.ClassCastException: org.apache.hadoop.io.DoubleWritable cannot be cast to org.apache.hadoop.io.LongWritable at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector.get(WritableLongObjectInspector.java:35) at org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:323) at org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serializeStruct(LazyBinarySerDe.java:255) at org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:202) at org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.processOp(ReduceSinkOperator.java:236) at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:474) at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:800) at org.apache.hadoop.hive.ql.exec.GroupByOperator.forward(GroupByOperator.java:1061) at org.apache.hadoop.hive.ql.exec.GroupByOperator.closeOp(GroupByOperator.java:1113) ... 13 more

我的UDAF有问题吗?请指出。谢谢你。

4

1 回答 1

0

将 init 方法中的 PrimitiveObjectInspectorFactory.writableLongObjectInspector 替换为 PrimitiveObjectInspectorFactory.writableDoubleObjectInspector。

于 2013-03-07T10:56:29.873 回答