问题标签 [confluent-kafka-python]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
2687 浏览

python - 如何使用 confluent-kafka-python 确定是否存在 kafka 主题

我正在使用 confluent-kafka-python 包与 Kafka 服务器进行交互。我可以成功创建主题并将事件推送给它。但是,我的问题在于当我启动多个节点(在 Docker 中运行)时,如果第二个实例也尝试创建主题,我会收到错误消息。在创建新主题之前,我需要先检查主题是否已经存在。

谢谢你的帮助!

0 投票
1 回答
1125 浏览

python - 如何以编程方式更新 Confluent Schema Registry 中的主题模式和兼容性

我已经在模式注册表中注册了一个模式,我可以register()像这样使用它,

现在我需要用一个新字段来更新这个相同的主题,所以我会得到新的模式 id 和version = 2.

我尝试使用sr.register(subject_name, updated_schema),它会为同一主题引发错误:

是的,此功能是注册新模式而不更新。我没有任何更新功能,我不知道该怎么做。那么如何更新架构?任何帮助,将不胜感激。

0 投票
1 回答
850 浏览

python - Confluent Kafka Python 错误:元数据请求失败

低于错误。不确定有什么问题。

0 投票
1 回答
637 浏览

kafka-consumer-api - 一些 Python Confluent Kafka 消费者保持空闲/未分配,即使其他人超载/过度分配

设置:

  • 120 个 python confluent-kafka 消费者都在订阅同一组主题
  • 8 个不同分区数的主题:1 个主题有 84 个分区,几个主题有 40-50 个分区,其余的有 1-10 个分区。分区总数约为 300 个。

我使用非常标准的订阅代码:

问题: 在我开始的 120 个消费者中,只有 84 个(与最大主题的分区数相同)获得分区分配 - 其他人没有任何分区分配,因此保持空闲状态。更糟糕的是,我通常会得到 5 个消费者,分配了 ~ 10 个分区,有些是 8 个,很多是 2-3-4,还有很多消费者只分配了一个分区。我相信订阅的“第一批”消费者会获得最多的主题,直到每个主题的可用分区都用完为止。

问题:

  1. 我阅读了partition.assignment.strategyJava 消费者可用的配置属性,但是我在 Confluent Kafka 客户端中找不到它。那么有没有办法在 Confluent Kafka Python Client 中配置分配策略?
  2. 有没有办法在服务器上设置分区分配策略,或者每个主题或每个组 ID?
  3. 或者是否有不同的方式在所有消费者之间分配负载?

感谢您花时间阅读我的问题:)

0 投票
1 回答
705 浏览

python-3.x - 如何使用 key.deserializer 和 value.deserializer 在 Python Confluent_Kafka 中解密自定义加密消息?

我的团队正在使用 confluent_kafka 生成实时流,并使用自定义加密被推送到队列中。背后的技术是Java。

我正在尝试使用主题中的 python 来使用消息,并且能够这样做但无法使用 python 对其进行解密。

输出=b'\x85s\n\x0e{!G\xc7\xdfaV\x8e\x03a\xc4\x8e\xe1\xbc\xf4\xf4F\xb5<{\xecD\xc2\xf8jNN

我无法在值反序列化器中提供自定义加密密钥。以下是它在java中的实现方式:

如何使用 python 自定义解密来解密字符串格式?

0 投票
1 回答
1017 浏览

apache-kafka - confluent-kafka Python 库 consumer.poll(timeout) 无法按预期工作

当我设置msg = consumer.poll(timeout=10.0)消费者等待 10 秒并按None预期返回时,但是当我将其更改msg = consumer.poll(timeout=3600.0)为此消费者时,只需立即返回None,而不是按预期等待 3600 秒。我在这里错过了什么吗?如果需要,这里是完整的代码。

0 投票
0 回答
448 浏览

python - 多处理 Python Kafka 消费者客户端没有收到消息

Python 消费者客户端在作为独立运行时运行良好,但在作为具有相同配置的多处理工作者运行时无法检索消息。

客户端总是在为 msg 获得 None 的块中打印消息。非常感谢诊断此问题的任何帮助。

工人基本上看起来像:

控制器看起来像:

客户端配置:

看起来消费者作为多处理工作者无法获得补偿:

0 投票
1 回答
787 浏览

python - 如何在融合的 kafka 中调试 AvroConsumer?

我正在尝试从 python 读取 Kafka,但收到的消息是 None ,CLI 中没有错误。我通过putty使用端口转发到目标主机,而不是通过telnet测试端口 - 它工作正常。此外,我在 Debian (WSL) 上使用 kafkacat,它工作得很好!

我正在使用 PyCharm,我的代码在文本下方。我该如何调试?

作为

0 投票
0 回答
220 浏览

python - 数据丢失Producer->Kafka->Nifi

我从 csv 文件向 kafka 写入消息。我的制作人说所有数据都生成到 Kafka 主题。

除此之外,我使用 apache nifi 作为消费者到 kafka 主题(ConsumeKafka_2_0 处理器)。

如果我将数据生成到 kafka 到一个流中 - 一切都很好,但是如果我尝试使用多个生产者并行处理多个文件,我会丢失很多行。

我的制片人的核心如下:

Nifi 处理器属性的屏幕截图: 在此处输入图像描述

在此处输入图像描述

Kafka 包含 3 个节点,主题复制因子 - 3

可能问题出在生产者写比消费者读快,某个块溢出时数据被删除?

请给我建议。

0 投票
1 回答
870 浏览

apache-kafka - 从 kafka 消费,没有无限循环

我目前正在使用 Confluent kafka python 客户端来使用来自 kafka 主题的消息,并且代码在while True循环内运行良好,如文档中的示例所示。但是,我想设置一个每天只从主题中消耗一次的 cron 作业。这个想法是作业将在早上检查主题,在那个时间点消费主题中的所有消息然后停止。我尝试在 python 中实现这一点,如下所示:

问题在于它永远不会消耗任何东西。我猜第一行在第一次尝试时永远不会收到消息。它仅适用,while True但我不希望此代码无限运行,直到该时间点的最后一条消息被消耗。