我是 Apache Storm 的新手。
我正在尝试使用 Apache Kafka、Storm 和 ESPER CEP 引擎开发一个实时流处理系统。
为此,我有一个 KafkaSpout,它将向 Bolts(具有我的 CEP 查询)发出流以过滤流。
我已经创建了一个拓扑,我正在尝试在本地集群上运行它
问题是在我的 bolts 中运行的 CEP 查询需要成批的元组来对流执行窗口操作。在我的拓扑中,KafkaSpout 一次只向 Bolts 发送一个元组进行处理。所以我的 CEP 查询没有按预期工作。
我在 Storm 中使用默认的 KafkaSpout。有什么办法可以一次将多个不同的元组发送到螺栓?一些配置调整可以做到这一点,还是我需要为此制作我的自定义 KafkaSpout?
请帮忙!!
我的拓扑:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("KafkaSpout", new KafkaSpout<>(KafkaSpoutConfig.builder("localhost:" + 9092, "weatherdata").setProp(ConsumerConfig.GROUP_ID_CONFIG, "weather-consumer-group").build()),4 );
builder.setBolt("A", new FeatureSelectionBolt(), 2).globalGrouping("KafkaSpout");
builder.setBolt("B", new TrendDetectionBolt(), 2).shuffleGrouping("A")
我正在使用 2 个螺栓和一个喷嘴。
我在 Bolt A 中运行的 esper 查询是
从 weatherEvent.win:length(3) 中选择 first(e), last(e) 作为 e
在这里,我试图从事件流的长度为 3 的窗口中获取第一个和最后一个事件。但是我得到相同的第一个和最后一个事件,因为 KafkaSpout 一次只发送一个元组。