我有一个可能有空结果的滑动窗口和自定义聚合累加器。丢弃此类“空”聚合累加器进入接收器的正确方法是什么?
Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(Sources.<Long, Foo>map("map"))
.map(Map.Entry::getValue)
.addTimestamps(Foo::getTimeMillisecond, LIMIT)
.window(WindowDefinition.sliding(100, 10))
.aggregate(FooAggregateOperations.aggregateFoo(), (s, e, r) -> {
return String.format("started: %s\n%s\nended: %s\n", s, r, e);
})
.drainTo(Sinks.files(sinkDirectory));
如您所见,聚合器返回字符串:
public class FooAggregateOperations {
public static AggregateOperation1<Foo, FooAccumulator, String> aggregateFoo() {
return AggregateOperation
.withCreate(FooAccumulator::new)
.andAccumulate(FooAggregateOperations::accumulate)
.andCombine(FooAggregateOperations::combine)
.andDeduct(FooAggregateOperations::deduct)
.andFinish(FooAccumulator::getResult);
}
}
问题基本上是,在将可忽略的窗口/聚合结果与其他结果合并/扣除或冲入接收器之前丢弃它们的方法是什么?