问题标签 [kafka-python]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - 如何获取 kafka 主题分区的最新偏移量?
我正在为 Kafka 使用 Python 高级消费者,并且想知道主题的每个分区的最新偏移量。但是我无法让它工作。
但我得到的输出是
我有另一种方法使用assign
,但结果是一样的
从一些文档看来,如果 afetch
尚未发布,我可能会得到这种行为。但我找不到强制执行的方法。我究竟做错了什么?
或者是否有不同/更简单的方法来获取主题的最新偏移量?
python - 使用 pykafka 在主题的特定分区上发布
如何在pykafka
主题的特定分区上发布消息。在下面的一段代码中,测试主题有四个分区,我打算在其中一个分区中写入每条消息,但显然它不是那样工作的。
apache-kafka - Kafka 0.9.0 单个分区上的多个 Python 消费者线程
对于上下文,我正在尝试将我们的 python 工作进程转移到基于 kafka (0.9.0) 的架构,但我对分区相对于消费者线程的限制感到困惑。分区上有多个消费者会导致同一分区上的其他线程等待当前线程完成吗?
apache-kafka - kafka-python:生产者无法连接
kafka-python (1.0.0) 在连接到代理时抛出错误。同时 /usr/bin/kafka-console-producer 和 /usr/bin/kafka-console-consumer 工作正常。
Python应用程序以前也可以正常工作,但是在zookeeper重新启动后,它不再可以连接。
我正在使用文档中的简单示例:
我收到此错误:
单步执行(/usr/lib/python2.6/site-packages/kafka/client_async.py)时,我注意到第 270 行的评估结果为 false:
在我的情况下 self._metadata_refresh_in_progress 是假的,但 ttl() = 0;
同时 kafka-console-* 正在愉快地推送消息:
有什么建议吗?
apache-kafka - Kafka:在多服务器设置上创建主题时出现 org.apache.zookeeper.KeeperException$NoNodeException
我正在尝试Kafka-0.8.2.2
在不同的机器上设置具有 1 个生产者、1 个消费者和 3 个代理的多节点集群。
在创建主题时producer
,我收到错误消息org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/ids
。完整的控制台输出可在此处获得。Kafka Producer
的日志中没有错误。
我用来运行的命令Kafka
是:
注意:Zookeeper 服务在所有服务器上运行,并且所有三个代理都在运行 Kafka 服务器(只有代理需要 Kafka 服务器。对吗?)。
我的producer.properties的配置如下:
以下是我用作参考的许多文章中的一些:
amazon-web-services - Zookeeper -Kafka:ConnectException - 连接被拒绝
我正在尝试在ubuntu EC2机器上设置 3 个 Kafka 代理。但我正在ConnectException
开始zookeeper
。security group
我的实例中的所有端口ec2
都已打开。
下面是堆栈跟踪:
下面是配置:
zookeeper.properties:
服务器属性:
我在实例的/etc/hosts中添加了服务器的公共 IP。我修改后的 /etc/hosts 如下:
中的唯一myid
条目/tmp/zookeper/myid
也正确输入。
我已遵循以下所有步骤:如何创建多节点 - AWS 上的 Kafka 的 MultiBroker 集群
docker - Docker 中的 Kafka 无法正常工作
我正在尝试将wurstmeister\kafka-docker
图像与 一起 使用docker-compose
,但我在连接所有内容时遇到了真正的问题。
我检查的所有帖子或问题似乎都没有任何问题,但坦率地说我迷路了。(并且在 SO 中至少有两个问题试图解决这个问题)
我认为问题在于我对docker
. 所以问题:
我可以从同一个 kafka 容器消费和生产,但是,当我尝试创建另一个容器(或将我的笔记本电脑与 python 客户端一起使用)时,我遇到了几个与advertised.host.name
参数相关的错误(在图像中这个参数是KAFKA_ADVERTISED_HOST_NAME
)
我已经尝试以多种方式设置此变量,但它根本不起作用。
所以我正在寻找一个权威的答案(即如何自动设置这些参数及其含义)如何设置docker-compose.yml
这是我的:
更新
根据@dnephin 的建议,我修改了start-kafka.sh
以下几行:
并KAFKA_ADVERTISED_HOST_NAME: "kafka"
从docker-compose.yml
我以规范的方式启动了容器:
两个容器都在运行:
后来我做了:
一切顺利。
检查 IP 地址:
然后我在两个不同的控制台中执行:
生产者:
一个消费者:
几乎立即,警告开始在屏幕上飞来飞去:
等等
在制作人的控制台中,我写了一些句子:
过了一会儿,我收到了这样的回复:
而在docker-compose logs
更新 2
我让它工作,至少,在docker-machine
:
首先,我定义了一个名为的变量docker-machine
:
然后,我修改docker-compose.yml
如下:
最后,在 的环境中docker-machine
,我执行:
但是在笔记本电脑中(我的意思是,不使用虚拟机,它不起作用)
python - PyKafka 偏移量提交
我正在使用以下脚本从 Kafka 主题中读取消息。我能够阅读消息。但是,偏移量不会提交。关于如何强制偏移提交的任何建议,或者目前可能阻止提交的任何建议?(注意:这基本上取自文档,您可以看到它与他们的平衡消费者示例非常相似。)
我正在使用 Python 2.7.8、pykafka 2.1.1 和 kafka 0.8.2.1。
谢谢你。
apache-kafka - AssertionError:未分配的分区
我试图通过设置偏移量来使用来自主题的数据但得到断言错误 -
错误:-