0

kafka_2.11-0.10.1.1使用confluent-kafka-0.9.2(主分支)python 绑定运行,它使用librdkafka-0.9.2. 我的机器运行 ubuntu-16.04 x86_64。我zookeeper-3.4.8-1在端口上运行2181。我像这样运行融合生产者示例:

$ cd confluent-kafka-python/examples
$ python producer.py localhost:9095 confluent-01
first message
2nd msg

消费者

$ python consumer.py localhost:9095 confluentgroup confluent-01

一切都在我的机器上本地运行,它不运行任何防火墙。

备注

  • 在 Zookeeper 上成功创建主题
  • broker 成功接收到 producer 消息:
  • 消费者设置以下conf{'bootstrap.servers': broker, 'group.id': group, 'session.timeout.ms': 6000, 'default.topic.config': {'auto.offset.reset': 'smallest'}, 'api.version.request': True }
  • 一开始producer/consumer工作得很好一段时间,直到我Receive failed: Disconnected成为制片人。摘录:

$ python producer.py  localhost:9095 confluent-02
asd
% Message delivered to confluent-02 [0]
1234123
890890
% Message delivered to confluent-02 [0]
%3|1485791262.420|FAIL|rdkafka#producer-1| [thrd:obscura.ax.example.com:9095/3]: obscura.ax.example.com:9095/3: Receive failed: Disconnected

问题:一段时间后我在消费者方面没有得到任何东西

问题:

  1. 我究竟做错了什么?
  2. 如何验证代理端已收到生产者消息? 生产者消息在代理端正确接收。
  3. 如何调试消费者端? 我添加'debug': "cgrp, topic, fetch"到消费者配置文件中。我在哪里可以阅读日志?
4

2 回答 2

1

我有两个建议:

1)尝试将选项--from-beginning添加到消费者命令

2) 代理的默认端口是 9092,因此请检查要使用的正确端口

希望这可以帮助。

于 2017-01-27T23:30:45.633 回答
1

我最终使事情顺利进行。最初我运行了confluent-kafka 教程,其中:

  • 不捕获ctrl+cSIGINT 信号,
  • 在中时不会超时poll()

在消费者代码中。因此我不得不ctrl+zkill %1我的 linux 机器上使用它。我相信这个终止并没有关闭套接字,它保持打开了一段时间(TIME_WAIT)。然后当我重新启动消费者时,它从旧套接字中取出垃圾并卡住了。

我添加try: [...] except KeyboardInterrupt: consumer.close()了捕捉ctrl+c并干净地关闭套接字。并且不再面临这个问题。

我希望这对将来的某人有所帮助。

于 2017-01-31T14:54:40.563 回答