0

我正在为 Kafka 制作多个主题。我想从 Kafka 中检索所有主题,并且我有不同的 spout 和 bolts。我想将每个主题发送到相应的 spout 和关联的 bolt(例如,对于 topic1,我有相应的 spout1 和 bolt1,对于 topic2,我有相应的spout2 和 bolt2 等等..)
我该怎么做?

4

2 回答 2

3

我创建了一个演示 kafka spout 项目,您应该可以将其用作起点:https ://github.com/buildlackey/cep/tree/master/storm%2Bkafka 。

问候 /

克里斯

于 2013-10-30T20:38:24.950 回答
0

虽然我不完全了解您正在尝试做什么(您是否为每个主题运行单独的拓扑?)通常您可以做的是,在您的 spout1 中创建一个消费者,该消费者将订阅 topic1 并尽快发出值它收到一个。然后将输出链接到相应的螺栓以供进一步执行。

但据我了解,您应该看看githubstorm -contrib 项目下的KafkaSpout实现。它基本上是一个从 Kafka 集群读取的 spout 实现,您所需要的只是正确创建配置。

从文档来看,它基本上看起来像这样

    SpoutConfig spoutConfig = new SpoutConfig(
               ImmutableList.of("kafkahost1", "kafkahost2"), // list of Kafka brokers
               8, // number of partitions per host
               "clicks", // topic to read from
               "/kafkastorm", // the root path in Zookeeper for the spout to store the consumer offsets
               "discovery"); // an id for this consumer for storing the consumer offsets in Zookeeper
    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

这里要提到的一件事是上面的实现使用了 Kafka 0.7,所以如果你使用最新的(0.8,你应该)实现,你可以在这里找到 0.8 支持

于 2013-09-23T12:55:10.557 回答