0

在我的 quarkus 服务中,我正在构建一个自定义累加器来使用Multi. 流可能是无限的,我想知道在积累了足够的数据后如何提前终止并收集结果?

下面是我的原型:

Multi<Data> sortedStream = getStream();
return this.sortedStream.collectItems().in(
    LinkedList::new,
    new BiConsumer<LinkedList<Coverage>, Data>() {
        @Override
        public void accept(LinkedList<Coverage> coverages, Data incoming) {
        if (coverages.isEmpty()) {
            coverages.add(new Coverage(incoming));
            return;
        }

        if (enough(coverages)) {
            // Question: How to early terminate and collect coverage downstream?
        }

        Coverage last = coverages.getLast();
        if (worthAdd(last, incoming)) {
            coverages.add(new Coverage(incoming));
        } else {
            return;
        }

        }
    }
)
4

1 回答 1

1

根据官方团队(https://github.com/smallrye/smallrye-mutiny/issues/501),collect收集所有物品,直到一个终端事件。我设法调整我的算法,想出一种方法来检查上游的每个项目并使用multi.select().firstAPI 提前终止。

于 2021-03-11T21:05:28.557 回答