6

尝试编写使用来自 Kafka 的消息的 Spark Streaming 作业。这是我到目前为止所做的:

  1. 启动 Zookeeper
  2. 启动 Kafka 服务器
  3. 向服务器发送了一些消息。当我运行以下命令时,我可以看到它们:

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning
    
  4. 现在尝试编写一个程序来计算 5 分钟内收到的消息数。

代码看起来像这样:

Map<String, Integer> map = new HashMap<String, Integer>();
map.put("mytopic", new Integer(1));
JavaStreamingContext ssc = new JavaStreamingContext(
        sparkUrl, " Spark Streaming", new Duration(60 * 5 * 1000), sparkHome, new String[]{jarFile});
JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "localhost:2181", "1", map);

不确定第三个参数(消费者组)使用什么值。当我运行它时,我得到Unable to connect to zookeeper server. 但是 Zookeeper 在端口上运行2181;否则,第 3 步将无法正常工作。

好像我没有KafkaUtils.createStream正确使用。有任何想法吗?

4

5 回答 5

2

没有默认消费者组这样的东西。您可以在那里使用任意非空字符串。如果你只有一个消费者,那么它的消费者群体并不重要。如果有两个或多个消费者,他们可以是同一个消费者组的一部分,也可以属于不同的消费者组。

来自http://kafka.apache.org/documentation.html

消费者

...

如果所有消费者实例都具有相同的消费者组,那么这就像传统的队列平衡消费者负载一样。

如果所有消费者实例都有不同的消费者组,那么这就像发布订阅一样,所有消息都广播给所有消费者。

我认为问题可能出在“主题”参数中。来自Spark 文档

要使用的 (topic_name -> numPartitions) 映射。每个分区都在自己的线程中使用

您只为主题指定了一个分区,即“1”。根据代理的设置(num.partitions),可能有多个分区,并且您的消息可能会发送到您的程序未读取的其他分区。

此外,我相信 partitionIds 是基于 0 的。因此,如果您只有一个分区,它的 id 将等于 0。

于 2014-11-04T07:51:12.203 回答
0

我认为您应该为 zookeeper 而不是 localhost 指定 ip。此外,第三个参数是消费者组名称。它可以是任何你喜欢的名字。当您有多个消费者绑定到同一组时,主题分区会相应分布。您的推文应该是:

JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "x.x.x.x", "dummy-group", map);
于 2015-07-10T13:49:13.953 回答
0

我面临着同样的问题。这是对我有用的解决方案。

  • 分配给 Spark Streaming 应用程序的核心数量必须大于接收器的数量。否则系统将接收数据,但无法处理它。因此 Spark Streaming 至少需要两个核心。所以在我的 spark-submit 中,我应该提到至少两个核心。
  • kafka-clients-version.jar 应该包含在 spark-submit 的依赖 jar 列表中。
于 2015-09-28T07:44:39.140 回答
0

如果 zookeeper 与您的流应用程序在同一台机器上运行,则“localhost:2181”将起作用。否则,您必须提及运行 zookeeper 的主机的地址,并确保运行流式应用程序的机器能够与端口 2181 上的 zookeeper 主机通信。

于 2016-05-25T05:53:48.103 回答
-2

我认为,在您的代码中,调用 KafkaUtils.createStream 的第二个参数应该是 kafka 服务器的主机:端口,而不是 zookeeper 主机和端口。检查一次。

编辑: Kafka Utils API 文档

根据上面的文档,它应该是 zookeeper quorum 。所以应该使用 Zookeeper 主机名和端口。

zkQuorum Zookeeper 仲裁(主机名:端口,主机名:端口,..)。

于 2015-06-13T11:12:46.813 回答