尝试编写使用来自 Kafka 的消息的 Spark Streaming 作业。这是我到目前为止所做的:
- 启动 Zookeeper
- 启动 Kafka 服务器
向服务器发送了一些消息。当我运行以下命令时,我可以看到它们:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning现在尝试编写一个程序来计算 5 分钟内收到的消息数。
代码看起来像这样:
Map<String, Integer> map = new HashMap<String, Integer>();
map.put("mytopic", new Integer(1));
JavaStreamingContext ssc = new JavaStreamingContext(
sparkUrl, " Spark Streaming", new Duration(60 * 5 * 1000), sparkHome, new String[]{jarFile});
JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "localhost:2181", "1", map);
不确定第三个参数(消费者组)使用什么值。当我运行它时,我得到Unable to connect to zookeeper server. 但是 Zookeeper 在端口上运行2181;否则,第 3 步将无法正常工作。
好像我没有KafkaUtils.createStream正确使用。有任何想法吗?