问题标签 [confluent-kafka-python]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
612 浏览

python-2.7 - 卡夫卡消费者滞后不是零

我有一个有 40 个分区的 Kafka 集群,我发布了几百万条消息。我有一个消费者群体,大部分消费者滞后<100。

在我的实验之后,我停止发布任何新消息并让消费者流失。

但是,我认为消费者滞后不会降到零。对于不同的分区,它在 0-200 之间敲击。有什么理由他们不会是零?

我正在为生产者和消费者使用融合的 Kafka python。

0 投票
0 回答
382 浏览

apache-kafka - 无法从我的本地机器生成和使用 kafka 消息到 ubuntu VM

我正在尝试从本地机器向 Ubuntu vm 生成 kafka 消息。

Telnet commad 表示成功连接到主机。

我正在使用以下代码,此代码在 Ubuntu vm 上运行良好,但当我尝试从主机执行相同代码时出现错误。

获取 KafkaTimeoutError: KafkaTimeoutError: 60.0 秒后无法更新元数据。

以下是调试日志。

0 投票
0 回答
101 浏览

apache-kafka - confluent-kafka python 库不适用于 ubutu14 和 python3

我正在使用“confluent-kafka==1.0.1”。当我使用 py3 和 ubuntu18 时它工作正常,但使用 py3 和 ubuntu14 失败。我收到以下错误。

0 投票
1 回答
1375 浏览

python - 将 confluent-kafka python 库与 AWS lambda 一起使用时出错

我正在尝试使用confluent-kafka python 库通过 lambda 函数管理我的集群,但该函数失败并出现错误:

我的要求.txt

为了创建 zip 文件,我将代码移动到虚拟环境的站点包位置并压缩了所有内容。

蟒蛇代码:

我正在使用 macOS 10.X。在 Linux 上,我注意到 pip install 创建了一个单独的 confluent_kafka.libs,它不会在 mac 上创建

0 投票
0 回答
204 浏览

python-3.x - Python 3 Confluent Kafka 在单个函数中运行生产者和消费者

我正在尝试在单个函数中运行 Producer 和 Consumer。

流程如下:

消息到达 --> 从主题 1 消费 --> 转换 --> 生产到主题 2

如果我以这种模式实例化生产者和消费者(先是生产者,然后是消费者),它可以正常工作。

但是如果我颠倒顺序并先实例化消费者,然后实例化生产者,

我得到错误:

{"level": "ERROR", "message": "Kafka 消费者错误:KafkaError{code=GROUP_AUTHORIZATION_FAILED,val=30,str="JoinGroup failed: Broker: Group authorization failed"}"}

我只是想知道这是否按预期和可能的原因起作用。

0 投票
1 回答
1053 浏览

apache-kafka - 重新启动后,Kafka Consumer 未使用上次提交的偏移量

我有一个来自订阅主题的消费者轮询。它消耗每条消息并进行一些处理(在几秒钟内),推送到不同的主题并提交偏移量。

总共有5000条消息,

重启前 - 消耗了 2900 条消息并提交了偏移量

重新启动后 - 从偏移量 0 开始消耗。

即使消费者是使用相同的消费者组创建的,它也会从偏移量 0 开始处理消息。

kafka 版本 (strimzi) > 2.0.0 kafka-python == 2.0.1

0 投票
1 回答
397 浏览

python - 来自 confluent_kafka 的 AvroConsumer 引发“'dict' 对象没有属性 'get_by_id'”

AvroConsumer的 from 模块confluent_kafka.avro总是'dict' object has no attribute 'get_by_id'在轮询时引发。

虽然,当我使用简单Consumer的 from进行轮询时,confluent_kafka我得到了二进制序列化。
ccloud CLI 也可以很好地使用 Kafka。

知道为什么confluent_kafka客户端不工作吗?是因为我的配置吗?
我用confluent-kafka==1.5.0.

这是我的python代码示例:

这引发了:

作为线索,我还想明确我轮询的所有序列化消息\x00\x00\x01\x86\xa1\都以我手动反序列化数据时必须摆脱的奇怪字节开头。

谢谢你的帮助!

0 投票
1 回答
476 浏览

apache-kafka - 在 Confluent Kafka Python 客户端中指定复制因子

我已经设置了 Kafka 的单个代理实例以及 Zookeeper、Kafka-tools、Schema-registry 和控制中心。设置是使用 docker compose 和 Confluent 提供的图像完成的。这是 docker-compose 的样子:

尝试使用 confluent-kafka python 客户端创建生产者和消费者应用程序。以下是我的 Kafka 生产者的代码:

现在,当我执行代码时,它应该创建一个 Kafka 主题并将 JSON 数据推送到该主题,但这似乎不起作用,它一直显示一个错误,即当只有一个 broker.is 时指定了复制因子 3有一种方法可以在上面的代码中定义复制因子。当使用 Kafka cli 执行相同操作时,kafka borker 可以完美地工作。有什么我想念的吗?

0 投票
0 回答
261 浏览

apache-kafka - Python confluent kafka 在代理连接断开时引发异常

我正在使用 python 3.7 和 confluent-kafka。

以下是我用来轮询 kafka 服务器并阅读消息的伪代码。

如果 kafka 服务器由于某种原因关闭或断开连接,我希望抛出异常。

目前,如果发生上述情况,while 循环会继续运行而不会出现任何错误并打印No msg

如何获得异常或检查每次循环中是否可以连接 kafka 服务器(如果要进行一些检查,它应该是轻量级的)

0 投票
0 回答
138 浏览

python - confluent-kafka-python 在生产者的飞行中没有捕捉到超时的元数据请求

嗨,我们的 kafka 已经崩溃了大约 2 天,而我们的一个使用 confluent kafka for python 的生产者仍然尝试生成一条消息,而我们收到的日志说Timed out MetadataRequest in flightMetadata request failed: periodic topic and broker list refresh: Local: Host Resolution failure

现在我们的问题是生产者没有像任何其他回调和异常一样捕获这些错误,并且在我们重新启动生产者之前它就这样卡住了。

您能帮我们找出为什么它没有像我们在交付时使用回调和 error_cb 的任何其他日志一样捕获这些日志,并且每个函数都包含异常。

还有是什么导致了这个错误,直到现在我们还没有收到这个错误。

另外,也许您知道我们如何重现此错误,以便我们可以尝试在我们的 on 上调试它,

谢谢!