2

这个问题是关于 Hazelcast Jet 0.5.1 中的 Pipeline API

我正在尝试创建的管道有两个无限源:一个是代码(每分钟发送一个事件的自定义源),另一个是 Kafka 主题。

它看起来像这样:

Pipeline pipeline = Pipeline.create();
ComputeStage<Object> tickerSource = pipeline.drawFrom(Sources.fromProcessor("ticker", TickerSource.getSupplier()));
ComputeStage<Object> kafkaSource  = pipeline.drawFrom(KafkaSources.kafka(sourceProperties, KAFKA_TOPIC));

当其中任何一个源发出事件时,我希望该事件经过相同的步骤并排入相同的接收器。如果我们将我的问题转换为 SQL 术语,我想要一个“UNION”。看起来像这样的东西:

目标管道

我发现的所有关于将两个节点合二为一的示例和文档都相当于 SQL“JOIN”操作,而不是“UNION”。

我发现绕过我的问题的唯一方法是做这样的事情,但我觉得这是框架应该已经具备的东西,尽管我似乎找不到它。

Arrays.asList(tickerSource, kafkaSource).forEach(source ->
{
    ComputeStage<Object> result = source.map(MyCustomProcessor::process);
    result.drainTo(Sinks.fromProcessor("first-sink", MyFirstSink.getSupplier());
    result.drainTo(Sinks.fromProcessor("second-sink", MySecondSink.getSupplier());
});

结果如下所示:

产生的管道

4

0 回答 0