问题标签 [kafka-python]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
6 回答
47912 浏览

python - 如何获取 kafka 主题分区的最新偏移量?

我正在为 Kafka 使用 Python 高级消费者,并且想知道主题的每个分区的最新偏移量。但是我无法让它工作。

但我得到的输出是

我有另一种方法使用assign,但结果是一样的

从一些文档看来,如果 afetch尚未发布,我可能会得到这种行为。但我找不到强制执行的方法。我究竟做错了什么?

或者是否有不同/更简单的方法来获取主题的最新偏移量?

0 投票
1 回答
1399 浏览

python - 使用 pykafka 在主题的特定分区上发布

如何在pykafka主题的特定分区上发布消息。在下面的一段代码中,测试主题有四个分区,我打算在其中一个分区中写入每条消息,但显然它不是那样工作的。

0 投票
1 回答
786 浏览

apache-kafka - Kafka 0.9.0 单个分区上的多个 Python 消费者线程

对于上下文,我正在尝试将我们的 python 工作进程转移到基于 kafka (0.9.0) 的架构,但我对分区相对于消费者线程的限制感到困惑。分区上有多个消费者会导致同一分区上的其他线程等待当前线程完成吗?

0 投票
7 回答
26934 浏览

apache-kafka - kafka-python:生产者无法连接

kafka-python (1.0.0) 在连接到代理时抛出错误。同时 /usr/bin/kafka-console-producer 和 /usr/bin/kafka-console-consumer 工作正常。

Python应用程序以前也可以正常工作,但是在zookeeper重新启动后,它不再可以连接。

我正在使用文档中的简单示例:

我收到此错误:

单步执行(/usr/lib/python2.6/site-packages/kafka/client_async.py)时,我注意到第 270 行的评估结果为 false:

在我的情况下 self._metadata_refresh_in_progress 是假的,但 ttl() = 0;

同时 kafka-console-* 正在愉快地推送消息:

有什么建议吗?

0 投票
2 回答
3831 浏览

apache-kafka - Kafka:在多服务器设置上创建主题时出现 org.apache.zookeeper.KeeperException$NoNodeException

我正在尝试Kafka-0.8.2.2在不同的机器上设置具有 1 个生产者、1 个消费者和 3 个代理的多节点集群。

在创建主题时producer,我收到错误消息org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/ids。完整的控制台输出可在此处获得。Kafka Producer的日志中没有错误。

我用来运行的命令Kafka是:

注意:Zookeeper 服务在所有服务器上运行,并且所有三个代理都在运行 Kafka 服务器(只有代理需要 Kafka 服务器。对吗?)。

我的producer.properties的配置如下:

以下是我用作参考的许多文章中的一些:

0 投票
1 回答
3160 浏览

amazon-web-services - Zookeeper -Kafka:ConnectException - 连接被拒绝

我正在尝试在ubuntu EC2机器上设置 3 个 Kafka 代理。但我正在ConnectException开始zookeepersecurity group我的实例中的所有端口ec2都已打开。

下面是堆栈跟踪:

下面是配置:

zookeeper.properties

服务器属性

我在实例的/etc/hosts中添加了服务器的公共 IP。我修改后的 /etc/hosts 如下:

中的唯一myid条目/tmp/zookeper/myid也正确输入。

我已遵循以下所有步骤:如何创建多节点 - AWS 上的 Kafka 的 MultiBroker 集群

0 投票
10 回答
67836 浏览

docker - Docker 中的 Kafka 无法正常工作

我正在尝试将wurstmeister\kafka-docker图像与 一起 使用docker-compose,但我在连接所有内容时遇到了真正的问题。

我检查的所有帖子或问题似乎都没有任何问题,但坦率地说我迷路了。(并且在 SO 中至少有两个问题试图解决这个问题)

我认为问题在于我对docker. 所以问题:

我可以从同一个 kafka 容器消费和生产,但是,当我尝试创建另一个容器(或将我的笔记本电脑与 python 客户端一起使用)时,我遇到了几个与advertised.host.name参数相关的错误(在图像中这个参数是KAFKA_ADVERTISED_HOST_NAME

我已经尝试以多种方式设置此变量,但它根本不起作用。

所以我正在寻找一个权威的答案(即如何自动设置这些参数及其含义)如何设置docker-compose.yml

这是我的:

更新

根据@dnephin 的建议,我修改了start-kafka.sh以下几行:

KAFKA_ADVERTISED_HOST_NAME: "kafka"docker-compose.yml

我以规范的方式启动了容器:

两个容器都在运行:

后来我做了:

一切顺利。

检查 IP 地址:

然后我在两个不同的控制台中执行:

生产者:

一个消费者:

几乎立即,警告开始在屏幕上飞来飞去:

等等

在制作人的控制台中,我写了一些句子:

过了一会儿,我收到了这样的回复:

而在docker-compose logs

更新 2

我让它工作,至少,在docker-machine

首先,我定义了一个名为的变量docker-machine

然后,我修改docker-compose.yml如下:

最后,在 的环境中docker-machine,我执行:

但是在笔记本电脑中(我的意思是,不使用虚拟机,它不起作用)

0 投票
0 回答
1489 浏览

python - PyKafka 偏移量提交

我正在使用以下脚本从 Kafka 主题中读取消息。我能够阅读消息。但是,偏移量不会提交。关于如何强制偏移提交的任何建议,或者目前可能阻止提交的任何建议?(注意:这基本上取自文档,您可以看到它与他们的平衡消费者示例非常相似。)

我正在使用 Python 2.7.8、pykafka 2.1.1 和 kafka 0.8.2.1。

谢谢你。

0 投票
3 回答
5474 浏览

apache-kafka - AssertionError:未分配的分区

我试图通过设置偏移量来使用来自主题的数据但得到断言错误 -

错误:-

0 投票
2 回答
890 浏览

python-2.7 - 无法使用端口 9092 连接到 Kafka

我在 kafka 中创建了一个具有 replica=2 的主题,并且 kafka 正在我的 docker 机器中运行。 在此处输入图像描述

通常,我应该能够使用端口 9092 连接到 docker。但我不能

在此处输入图像描述

虽然,我可以使用 32783 连接。不知道是什么原因?

在此处输入图像描述