2

我有一个可能有空结果的滑动窗口和自定义聚合累加器。丢弃此类“空”聚合累加器进入接收器的正确方法是什么?

        Pipeline pipeline = Pipeline.create();
        pipeline.drawFrom(Sources.<Long, Foo>map("map"))
                .map(Map.Entry::getValue)
                .addTimestamps(Foo::getTimeMillisecond, LIMIT)
                .window(WindowDefinition.sliding(100, 10))
                .aggregate(FooAggregateOperations.aggregateFoo(), (s, e, r) -> {
                    return String.format("started: %s\n%s\nended: %s\n", s, r, e);
                })
                .drainTo(Sinks.files(sinkDirectory));

如您所见,聚合器返回字符串:

public class FooAggregateOperations {

    public static AggregateOperation1<Foo, FooAccumulator, String> aggregateFoo() {
        return AggregateOperation
                .withCreate(FooAccumulator::new)
                .andAccumulate(FooAggregateOperations::accumulate)
                .andCombine(FooAggregateOperations::combine)
                .andDeduct(FooAggregateOperations::deduct)
                .andFinish(FooAccumulator::getResult);
    }
}

问题基本上是,在将可忽略的窗口/聚合结果与其他结果合并/扣除或冲入接收器之前丢弃它们的方法是什么?

4

1 回答 1

1

要过滤掉空的聚合结果,您可以使用以下方法:

    Pipeline pipeline = Pipeline.create();
    pipeline.drawFrom(Sources.<Long, Foo>map("map"))
            .map(Map.Entry::getValue)
            .addTimestamps(Foo::getTimeMillisecond, LIMIT)
            .window(WindowDefinition.sliding(100, 10))
            .aggregate(FooAggregateOperations.aggregateFoo(),
                    (s, e, r) -> tuple3(s, e, r))
            .filter(t -> !isEmpty(t.f2()))
            .map(t -> String.format("started: %s\n%s\nended: %s\n", t.f0(), t.f2(), t.f1()))
            .drainTo(Sinks.files("sinkDirectory"));

这样做是将聚合结果存储在一个临时元组中,然后应用过滤,然后应用最终映射。

我还在GitHub 上创建了一个问题,我们将考虑在聚合操作中支持这种行为。

于 2018-04-25T07:38:22.757 回答