0

我是 Apache Storm 的新手。

我正在尝试使用 Apache Kafka、Storm 和 ESPER CEP 引擎开发一个实时流处理系统。

为此,我有一个 KafkaSpout,它将向 Bolts(具有我的 CEP 查询)发出流以过滤流。

我已经创建了一个拓扑,我正在尝试在本地集群上运行它

问题是在我的 bolts 中运行的 CEP 查询需要成批的元组来对流执行窗口操作。在我的拓扑中,KafkaSpout 一次只向 Bolts 发送一个元组进行处理。所以我的 CEP 查询没有按预期工作。

我在 Storm 中使用默认的 KafkaSpout。有什么办法可以一次将多个不同的元组发送到螺栓?一些配置调整可以做到这一点,还是我需要为此制作我的自定义 KafkaSpout?

请帮忙!!

我的拓扑:

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("KafkaSpout", new KafkaSpout<>(KafkaSpoutConfig.builder("localhost:" + 9092, "weatherdata").setProp(ConsumerConfig.GROUP_ID_CONFIG, "weather-consumer-group").build()),4 );

builder.setBolt("A", new FeatureSelectionBolt(), 2).globalGrouping("KafkaSpout");

builder.setBolt("B", new TrendDetectionBolt(), 2).shuffleGrouping("A")

我正在使用 2 个螺栓和一个喷嘴。

我在 Bolt A 中运行的 esper 查询是

从 weatherEvent.win:length(3) 中选择 first(e), last(e) 作为 e

在这里,我试图从事件流的长度为 3 的窗口中获取第一个和最后一个事件。但是我得到相同的第一个和最后一个事件,因为 KafkaSpout 一次只发送一个元组。

4

1 回答 1

0

spout 做不到,但您可以使用 Storm 的窗口支持https://storm.apache.org/releases/2.0.0-SNAPSHOT/Windowing.html,或者只是编写一个聚合螺栓并将其放在 spout 之间和拓扑的其余部分。

所以你的拓扑应该是spout -> aggregator -> feature selection -> trend detection.

我建议您尝试内置的窗口支持,但如果您更愿意编写自己的聚合,您的 bolt 只需要接收一些元组(例如 3 个),并发出一个包含所有值的新元组。

聚合器螺栓应该做类似的事情

private List<Tuple> buffered;

execute(Tuple input) {
  if (buffered.size != 2) {
    buffered.add(input)
    return
  }
  Tuple first = buffered.get(0)
  Tuple second = buffered.get(1)
  Values aggregate = new Values(first.getValues(), second.getValues(), input.getValues())
  List<Tuple> anchors = List.of(first, second, input)
  collector.emit(anchors, aggregate)
  collector.ack(first, second, input)
  buffered.clear()
}

这样,您最终会得到一个包含 3 个输入元组内容的元组。

于 2019-03-05T09:35:33.020 回答