2

在不使用 UDF 的情况下,如何计算 Apache Pig 中数据集的统计模式?

A,20
A,10
A,10
B,40
B,40
B,20
B,10

data = LOAD 'myData.txt' USING PigStorage(',') AS key, value;
byKey = GROUP data BY key;
mode = FOREACH byKey GENERATE MODE(data.value);  -- How to define MODE() ??
DUMP mode;
-- Correct answer:  (A, 10), (B, 40)
4

3 回答 3

3

这是一个版本,每个键只能找到一个结果:

data = LOAD 'mode_data.dat' USING PigStorage(',') AS (key, value);
byKeyValue = GROUP data BY (key, value); 
cntKeyValue = FOREACH byKeyValue GENERATE FLATTEN(group) AS (key, value), COUNT(data) as cnt;
byKey = GROUP cntKeyValue BY key;
mode = FOREACH byKey {
    freq = ORDER cntKeyValue BY cnt DESC;
    topFreq = LIMIT freq 1; -- one of the most frequent values for key of the group
    GENERATE FLATTEN(topFreq.(key, value));
};

这个版本会为同一个键找到所有同样频繁的值:

data = LOAD 'mode_data.dat' USING PigStorage(',') AS (key, value);
byKeyValue = GROUP data BY (key, value);
cntKeyValue = FOREACH byKeyValue GENERATE FLATTEN(group) AS (key, value), COUNT(data) as cnt;
byKey = GROUP cntKeyValue BY key;
mostFreqCnt = FOREACH byKey { -- calculate the biggest count for each key
    freq = ORDER cntKeyValue BY cnt DESC;
    topFreq = LIMIT freq 1;
    GENERATE FLATTEN(topFreq.(key, cnt)) as (key, cnt);
};

modeAll = COGROUP cntKeyValue BY (key, cnt), mostFreqCnt BY (key, cnt); -- get all values with the same count and same key, used cogroup as next command was throwing some errors during execution
mode = FOREACH (FILTER modeAll BY not IsEmpty(mostFreqCnt)) GENERATE FLATTEN(cntKeyValue.(key, value)) as (key, value);
于 2012-12-30T19:19:15.597 回答
1

我有一个简单的 UDF 用于计算这里的模式(它使用 apache commons-math3,pig 0.10.0):

public class MODE extends EvalFunc<DataBag> {
    TupleFactory mTupleFactory = TupleFactory.getInstance();
    BagFactory mBagFactory = BagFactory.getInstance();

    public DataBag exec(Tuple inputTuple) throws IOException {
        if (inputTuple == null || inputTuple.size() == 0) {
            return null;
        }
        try {
            Frequency frequency = new Frequency();
            DataBag output = mBagFactory.newDefaultBag();
            DataBag values = (DataBag) inputTuple.get(0);
            for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
                Tuple tuple = it.next();
                frequency.addValue((Long) tuple.get(0));
            }
            Set<Long> setOfMostFrequentVals = new HashSet<Long>();
            Long greatestFrequency = 0l;
            for (Iterator<Comparable<?>> it = frequency.valuesIterator(); it.hasNext();) {
                Long val = (Long) it.next();
                if (frequency.getCount(val) >= greatestFrequency) {
                    if (frequency.getCount(val) > greatestFrequency) {
                        setOfMostFrequentVals.clear();
                        greatestFrequency = frequency.getCount(val);
                    }
                    setOfMostFrequentVals.add(val);
                }
            }
            for (Long mostFequentVal : setOfMostFrequentVals) {
            output.add(mTupleFactory.newTuple(mostFequentVal));
        }
    return output;
        } catch (Exception e) {
            int errCode = 2106;
            String msg = "Error while computing mode in " + this.getClass().getSimpleName();
            throw new ExecException(msg, errCode, PigException.BUG, e);
        }
    }
}
于 2012-12-28T10:13:33.473 回答
0

为.编写Eval Function 用户定义函数MODE

一般来说,Pig 的评估函数数量非常有限,当您开始做比 min/max/count 更复杂的事情时,您需要熟悉编写 UDF。

于 2012-12-27T21:43:10.190 回答