问题标签 [confluent-kafka-python]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - 如何使用 confluent-kafka-python 确定是否存在 kafka 主题
我正在使用 confluent-kafka-python 包与 Kafka 服务器进行交互。我可以成功创建主题并将事件推送给它。但是,我的问题在于当我启动多个节点(在 Docker 中运行)时,如果第二个实例也尝试创建主题,我会收到错误消息。在创建新主题之前,我需要先检查主题是否已经存在。
谢谢你的帮助!
python - 如何以编程方式更新 Confluent Schema Registry 中的主题模式和兼容性
我已经在模式注册表中注册了一个模式,我可以register()
像这样使用它,
现在我需要用一个新字段来更新这个相同的主题,所以我会得到新的模式 id 和version = 2
.
我尝试使用sr.register(subject_name, updated_schema)
,它会为同一主题引发错误:
是的,此功能是注册新模式而不更新。我没有任何更新功能,我不知道该怎么做。那么如何更新架构?任何帮助,将不胜感激。
python - Confluent Kafka Python 错误:元数据请求失败
低于错误。不确定有什么问题。
kafka-consumer-api - 一些 Python Confluent Kafka 消费者保持空闲/未分配,即使其他人超载/过度分配
设置:
- 120 个 python confluent-kafka 消费者都在订阅同一组主题
- 8 个不同分区数的主题:1 个主题有 84 个分区,几个主题有 40-50 个分区,其余的有 1-10 个分区。分区总数约为 300 个。
我使用非常标准的订阅代码:
问题: 在我开始的 120 个消费者中,只有 84 个(与最大主题的分区数相同)获得分区分配 - 其他人没有任何分区分配,因此保持空闲状态。更糟糕的是,我通常会得到 5 个消费者,分配了 ~ 10 个分区,有些是 8 个,很多是 2-3-4,还有很多消费者只分配了一个分区。我相信订阅的“第一批”消费者会获得最多的主题,直到每个主题的可用分区都用完为止。
问题:
- 我阅读了
partition.assignment.strategy
Java 消费者可用的配置属性,但是我在 Confluent Kafka 客户端中找不到它。那么有没有办法在 Confluent Kafka Python Client 中配置分配策略? - 有没有办法在服务器上设置分区分配策略,或者每个主题或每个组 ID?
- 或者是否有不同的方式在所有消费者之间分配负载?
感谢您花时间阅读我的问题:)
python-3.x - 如何使用 key.deserializer 和 value.deserializer 在 Python Confluent_Kafka 中解密自定义加密消息?
我的团队正在使用 confluent_kafka 生成实时流,并使用自定义加密被推送到队列中。背后的技术是Java。
我正在尝试使用主题中的 python 来使用消息,并且能够这样做但无法使用 python 对其进行解密。
输出=b'\x85s\n\x0e{!G\xc7\xdfaV\x8e\x03a\xc4\x8e\xe1\xbc\xf4\xf4F\xb5<{\xecD\xc2\xf8jNN
我无法在值反序列化器中提供自定义加密密钥。以下是它在java中的实现方式:
如何使用 python 自定义解密来解密字符串格式?
apache-kafka - confluent-kafka Python 库 consumer.poll(timeout) 无法按预期工作
当我设置msg = consumer.poll(timeout=10.0)
消费者等待 10 秒并按None
预期返回时,但是当我将其更改msg = consumer.poll(timeout=3600.0)
为此消费者时,只需立即返回None
,而不是按预期等待 3600 秒。我在这里错过了什么吗?如果需要,这里是完整的代码。
python - 多处理 Python Kafka 消费者客户端没有收到消息
Python 消费者客户端在作为独立运行时运行良好,但在作为具有相同配置的多处理工作者运行时无法检索消息。
客户端总是在为 msg 获得 None 的块中打印消息。非常感谢诊断此问题的任何帮助。
工人基本上看起来像:
控制器看起来像:
客户端配置:
看起来消费者作为多处理工作者无法获得补偿:
python - 如何在融合的 kafka 中调试 AvroConsumer?
我正在尝试从 python 读取 Kafka,但收到的消息是 None ,CLI 中没有错误。我通过putty使用端口转发到目标主机,而不是通过telnet测试端口 - 它工作正常。此外,我在 Debian (WSL) 上使用 kafkacat,它工作得很好!
我正在使用 PyCharm,我的代码在文本下方。我该如何调试?
作为
apache-kafka - 从 kafka 消费,没有无限循环
我目前正在使用 Confluent kafka python 客户端来使用来自 kafka 主题的消息,并且代码在while True
循环内运行良好,如文档中的示例所示。但是,我想设置一个每天只从主题中消耗一次的 cron 作业。这个想法是作业将在早上检查主题,在那个时间点消费主题中的所有消息然后停止。我尝试在 python 中实现这一点,如下所示:
问题在于它永远不会消耗任何东西。我猜第一行在第一次尝试时永远不会收到消息。它仅适用,while True
但我不希望此代码无限运行,直到该时间点的最后一条消息被消耗。