我曾在Storm工作并开发了一个使用本地文本文件作为输入源的基本程序。但现在我必须处理来自外部系统的连续流数据。为此,Kafka 是最佳选择。
问题是如何让我的 Spout 从 Kafka 获取流数据。或者如何将 Storm 与 Kafka 集成。我该怎么做才能处理来自 Kafka 的数据?
我曾在Storm工作并开发了一个使用本地文本文件作为输入源的基本程序。但现在我必须处理来自外部系统的连续流数据。为此,Kafka 是最佳选择。
问题是如何让我的 Spout 从 Kafka 获取流数据。或者如何将 Storm 与 Kafka 集成。我该怎么做才能处理来自 Kafka 的数据?
寻找KafkaSpout。
这是一个从 Kafka 集群读取的普通 Storm Spout 实现。您只需要使用 , 等参数配置该 spout list of brokers
。topic name
然后您可以简单地将输出链接到相应的螺栓以进行进一步处理。
从上面提到的同一个文档中,配置如下:
SpoutConfig spoutConfig = new SpoutConfig(
ImmutableList.of("kafkahost1", "kafkahost2"), // List of Kafka brokers
8, // Number of partitions per host
"clicks", // Topic to read from
"/kafkastorm", // The root path in Zookeeper for the spout to store the consumer offsets
"discovery"); // An id for this consumer for storing the consumer offsets in Zookeeper
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);