0

我是 Trident 的新手,我希望创建一个类似于“Sum()”的“Average”聚合器,但用于“Average”。以下不起作用:

       public class Average implements CombinerAggregator<Long>.......{

       public Long init(TridentTuple tuple)
       {
       (Long)tuple.getValue(0);
        }
        public Long Combine(long val1,long val2){
        return val1+val2/2;
        }
        public Long zero(){
        return 0L;
         }
       }

它可能在语法上并不完全正确,但这就是想法。如果可以的话请帮忙。给定 2 个值为 [2,4,1] 和 [2,2,5] 以及字段 'a'、'b' 和 'c' 的元组,并对字段 'b' 进行平均应该返回 '3'。我不完全确定 init() 和 zero() 是如何工作的。

非常感谢您提前提供的帮助。

伊莱

4

2 回答 2

1
public class Average implements CombinerAggregator<Number> {

int count = 0;
double sum = 0;

@Override
public Double init(final TridentTuple tuple) {
    this.count++;
    if (!(tuple.getValue(0) instanceof Double)) {

        double d = ((Number) tuple.getValue(0)).doubleValue();

        this.sum += d;

        return d;
    }

    this.sum += (Double) tuple.getValue(0);
    return (Double) tuple.getValue(0);

}

@Override
public Double combine(final Number val1, final Number val2) {
    return this.sum / this.count;

}

@Override
public Double zero() {
    this.sum = 0;
    this.count = 0;
    return 0D;
}
}
于 2013-11-05T17:30:46.397 回答
0

当谈到 Trident 时,我也是一个完整的新手,所以如果以下内容可行,我并不完全是。但它可能:

public class AvgAgg extends BaseAggregator<AvgState> {
    static class AvgState {
        long count = 0;
        long total = 0;

        double getAverage() {
            return total/count;
        }
    }

    public AvgState init(Object batchId, TridentCollector collector) {
        return new AvgState();
    }

    public void aggregate(AvgState state, TridentTuple tuple, TridentCollector collector) {
        state.count++;
        state.total++;
    }

    public void complete(AvgState state, TridentCollector collector) {
        collector.emit(new Values(state.getAverage()));
    }
}
于 2013-10-30T18:53:55.483 回答