3

我注意到 java apache beam 有类 groupby.sortbytimestamp python 是否实现了该功能?如果不是,那么在窗口中对元素进行排序的方法是什么?我想我可以在 DoFn 中对整个窗口进行排序,但我想知道是否有更好的方法。

4

2 回答 2

6

目前 Beam 中没有内置的值排序(在 Python 或 Java 中)。现在,最好的选择是您自己在 DoFn 中对值进行排序,就像您提到的那样。

于 2016-09-29T17:17:47.673 回答
1

这是使用 CombineFn 的解决方案。它具有使用 TreeSet 删除重复数据的额外好处。您还应该确保您的窗口数据足够小以适合单个工作人员的内存。

public static class DedupAndSortByTime extends Combine.CombineFn<MarketData, TreeSet<MarketData>, List<MarketData>> {
@Override
public TreeSet<MarketData> createAccumulator() {
    return new TreeSet<>(Comparator
            .comparingLong(MarketData::getEventTime)
            .thenComparing(MarketData::getOrderbookType));
}

@Override
public TreeSet<MarketData> addInput(TreeSet<MarketData> accum, MarketData input) {
    accum.add(input);
    return accum;
}

@Override
public TreeSet<MarketData> mergeAccumulators(Iterable<TreeSet<MarketData>> accums) {

    TreeSet<MarketData> merged = createAccumulator();
    for (TreeSet<MarketData> accum : accums) {
        merged.addAll(accum);
    }
    return merged;
}

@Override
public List<MarketData> extractOutput(TreeSet<MarketData> accum) {
    return Lists.newArrayList(accum.iterator());
}

}

于 2017-12-14T00:05:52.223 回答