1

Kafka 0.8 效果很好。我能够使用 CLI 以及编写我自己的生产者/消费者!

检查 Zookeeper... 我看到所有为 0.8 成功创建的主题和分区。

卡夫卡 0.7 不工作!

为什么选择 Kafka 0.7?我正在使用为 Kafka 0.7 制作的 Storm 的 Kafka Spout。

首先,我只想为 Kafka 0.7 运行基于 CLI 的生产者/消费者,但我无法做到。我执行以下步骤:

  1. 我删除了 Zookeeper 中从我的 Kafka 0.8 创建的所有主题/分区等
  2. 我将 zoo.cfg 中的 dataDir 更改为指向不同的位置。
  3. 现在我启动 kafka 服务器 0.7。它成功启动。但是我不知道为什么它会再次注册我删除的代理主题?
  4. 现在我启动 Kafka Producer :

    bin/kafka-console-producer.sh --zookeeper localhost:2181 --topic topicime & 它成功启动:[2013-06-28 14:06:05,521] INFO zookeeper 状态已更改 (SyncConnected) (org.I0Itec.zkclient. ZkClient) [2013-06-28 14:06:05,606] INFO 在 0:0 (kafka.producer.ProducerPool) 为代理 id = 0 创建异步生产者

  5. 是时候发送一些消息了,哎呀,我收到了这个错误:

    [2013-06-28 14:07:19,650] 信息从 0:0 断开连接(kafka.producer.SyncProducer)[2013-06-28 14:07:19,653] 错误连接尝试到 0:0 失败,下次尝试在 1 ms (kafka.producer.SyncProducer) java.net.ConnectException: 在 sun.nio.ch.Net.connect(Net.java:364) 处 sun.nio.ch.Net.connect0(Native Method) 处拒绝连接。 nio.ch.Net.connect(Net.java:356) 在 sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:623) 在 kafka.producer.SyncProducer.connect(SyncProducer.scala:173) 在 kafka.producer .SyncProducer.getOrMakeConnection(SyncProducer.scala:196) 在 kafka.producer.SyncProducer.send(SyncProducer.scala:92) 在 kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135) 在 kafka.producer.async.DefaultEventHandler。发送(DefaultEventHandler.scala:58) 在 kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44) 在 kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116) 在 scala.collection.immutable.Stream.foreach(Stream.scala :254) 在 kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70) 在 kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41)

请注意,Zookeeper 已经在运行。

任何帮助将不胜感激。

编辑:

我什至没有看到在 zookeeper 中创建的主题。我正在运行以下命令:

bin/kafka-console-producer.sh --zookeeper localhost:2181 --topic topicime

命令后一切正常,我收到以下消息:

[2013-06-28 14:30:17,614] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x13f805c6673004b, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2013-06-28 14:30:17,615] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2013-06-28 14:30:17,700] INFO Creating async producer for broker id = 0 at 0:0 (kafka.producer.ProducerPool)

但是现在当我输入要发送的字符串时,出现上述错误(连接被拒绝!)

4

2 回答 2

0
INFO Disconnecting from 0:0 (kafka.producer.SyncProducer) 

上面的行中隐藏了错误。0:0 不是有效的主机和端口。解决方案是通过在 server.properties 中设置“hostname”属性来显式设置要在 Zookeeper 中注册的主机 ip。

于 2013-07-08T09:17:20.930 回答
0

考虑查看storm-kafka fork,在https://github.com/wurstmeister/storm-kafka-0.8-plus

我现在正在为我们的服务器安装它 =)。

于 2014-04-09T00:26:51.217 回答