2

在 GenericUDAFCount.java 中:

@Description(name = "count",
value = "_FUNC_(*) - Returns the total number of retrieved rows, including "
      +        "rows containing NULL values.\n"

      + "_FUNC_(expr) - Returns the number of rows for which the supplied "
      +        "expression is non-NULL.\n"

      + "_FUNC_(DISTINCT expr[, expr...]) - Returns the number of rows for "
      +        "which the supplied expression(s) are unique and non-NULL.")

但我没有看到任何处理“不同”表达式的代码。

public static class GenericUDAFCountEvaluator extends GenericUDAFEvaluator {
private boolean countAllColumns = false;
private LongObjectInspector partialCountAggOI;
private LongWritable result;

@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters)
throws HiveException {
  super.init(m, parameters);
  partialCountAggOI =
    PrimitiveObjectInspectorFactory.writableLongObjectInspector;
  result = new LongWritable(0);
  return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
}

private GenericUDAFCountEvaluator setCountAllColumns(boolean countAllCols) {
  countAllColumns = countAllCols;
  return this;
}

/** class for storing count value. */
static class CountAgg implements AggregationBuffer {
  long value;
}

@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
  CountAgg buffer = new CountAgg();
  reset(buffer);
  return buffer;
}

@Override
public void reset(AggregationBuffer agg) throws HiveException {
  ((CountAgg) agg).value = 0;
}

@Override
public void iterate(AggregationBuffer agg, Object[] parameters)
  throws HiveException {
  // parameters == null means the input table/split is empty
  if (parameters == null) {
    return;
  }
  if (countAllColumns) {
    assert parameters.length == 0;
    ((CountAgg) agg).value++;
  } else {
    assert parameters.length > 0;
    boolean countThisRow = true;
    for (Object nextParam : parameters) {
      if (nextParam == null) {
        countThisRow = false;
        break;
      }
    }
    if (countThisRow) {
      ((CountAgg) agg).value++;
    }
  }
}

@Override
public void merge(AggregationBuffer agg, Object partial)
  throws HiveException {
  if (partial != null) {
    long p = partialCountAggOI.get(partial);
    ((CountAgg) agg).value += p;
  }
}

@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
  result.set(((CountAgg) agg).value);
  return result;
}

@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
  return terminate(agg);
}

}

Hive是如何实现的count(distinct ...)?当任务运行时,它确实花费了很多时间。它在源代码中的什么位置?

4

1 回答 1

1

因为您可以只运行 SELECT DISTINCT column1 FROM table1,DISTINCT 表达式不是标志或选项,它是独立评估的

这个页面说:

对于 DISTINCT 实现,绑定到参数类型的数据的实际过滤由框架而不是 COUNT UDAF 实现处理。

如果您想深入了解源详细信息,请查看hive git 存储库

于 2012-09-05T03:26:23.293 回答