1

我需要将聚合结果填充到 3 个单独的接收器 - 更新逻辑略有不同的映射。我试图将管道对象转换为 DAG 并将另一条边添加到倒数第二个顶点,但它引发了一个异常,即不允许多个出站边。有没有办法创建具有多个出站边缘的 DAG?

4

1 回答 1

1

您应该能够将要排放到多个接收器的阶段分配给一个变量,然后drainTo()使用不同的接收器重复调用它。

例子:

StreamStage<TimestampedEntry<..>> stage = pipeline.drawFrom(..)
                             .map(..)
                             .groupingKey(..)
                             .window(..)
                             .aggregate(counting());

stage.drainTo(Sinks.map("map1));
stage.drainTo(Sinks.map("map2")).

如果您想使用 DAG API 实现相同的目的,则需要使用该构造将它们分配给不同的序数。Edge.from().to()但是,如果您已经开始使用管道,则这不是必需的。

于 2018-07-25T21:07:49.643 回答