这个问题是关于 Hazelcast Jet 0.5.1 中的 Pipeline API
我正在尝试创建的管道有两个无限源:一个是代码(每分钟发送一个事件的自定义源),另一个是 Kafka 主题。
它看起来像这样:
Pipeline pipeline = Pipeline.create();
ComputeStage<Object> tickerSource = pipeline.drawFrom(Sources.fromProcessor("ticker", TickerSource.getSupplier()));
ComputeStage<Object> kafkaSource = pipeline.drawFrom(KafkaSources.kafka(sourceProperties, KAFKA_TOPIC));
当其中任何一个源发出事件时,我希望该事件经过相同的步骤并排入相同的接收器。如果我们将我的问题转换为 SQL 术语,我想要一个“UNION”。看起来像这样的东西:
我发现的所有关于将两个节点合二为一的示例和文档都相当于 SQL“JOIN”操作,而不是“UNION”。
我发现绕过我的问题的唯一方法是做这样的事情,但我觉得这是框架应该已经具备的东西,尽管我似乎找不到它。
Arrays.asList(tickerSource, kafkaSource).forEach(source ->
{
ComputeStage<Object> result = source.map(MyCustomProcessor::process);
result.drainTo(Sinks.fromProcessor("first-sink", MyFirstSink.getSupplier());
result.drainTo(Sinks.fromProcessor("second-sink", MySecondSink.getSupplier());
});
结果如下所示: