问题标签 [confluent-kafka-python]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - 在 Kafka Python 中设置分区键表达式配置
我正在使用confluent 的 kafka python 包。我想在 Spring (Java) 中向 Producer 添加一个名为partition-key-expression的配置属性(有关更多信息,请参阅此参考)
我现在实例化生产者的方式如下:
我想知道是否可以添加partition-key-expression配置属性,因为我在文档中找不到它。
python - 尝试初始化kafka生产者时没有这样的配置属性:“schema.compatibility.level”
我正在使用融合卡夫卡。
我的代码是
运行这个时,我得到
即使使用旧avro.compatibility.level
参数也会引发相同的错误
它适用于其他参数,例如
"acks": "all","debug": "msg"
但不适用于兼容性级别。
请帮我解决这个问题。旧模式有很多变化,既不向前也不向后兼容。我需要将其设置为 None 。请帮忙。!
python - 无法从 confluent_kafka 导入生产者
我正在使用带有 python 3.9 和 confluent_kafka 库的 conda 环境(通过 pip install confluent-kafka 安装)。我还安装了 librdkafka。从 pycharm 我无法导入:
我收到此错误:
你能帮忙解决这个问题吗?
python-3.x - 尝试从远程主机向我的 kafka 主题发布消息
我在 Django 框架中使用 confluent-Kafka lib 来创建和使用我的 Kafka 客户端。我的 Kafka 代理当前正在 docker 容器内的远程 ESXI 机器 (Linux) 上运行。我也在使用 docker-compose 来构建和运行容器。
当在 ESXI 上运行相同的代码(主机和端口的 env 设置不同)时,我可以使用我的 kafka 客户端发布/读取消息,但是在我的本地 Windows 主机上运行我的 Django 服务器时,我' m 得到下一个错误:
这是我的 docker-compose 示例文件:
python - 在 SchemaRegistryClient Confluent Kafka 中禁用证书验证
所以,我想从 kafka (Confluent) 中阅读一个主题,其中数据位于 Avro 格式。
由于某些不可避免的原因,我想禁用证书验证。
我正在使用 security.protocol= SASL_SSL 和 sasl.mechanisms= OUTHBEARER
我可以通过禁用 ssl 证书验证来连接到 Kafka
现在,我在尝试使用 Schema 注册表反序列化值时遇到了问题。avro 解串器需要模式注册表客户端和模式(可选)。我两个都过去了。我通过使用 verify=False 发出单独的请求来获取第二个参数的值以获取架构,这很好。但是当我尝试从中创建一个反序列化消费者时,问题就出现了。
基本上是代码的骨架(以及问题所在的注释)
您在上面看到的 get_basic_configuration 方法有
错误是
我在这里浏览了 SchemaRegistryClient 的代码, 但我没有看到任何将证书验证设置为 false 的选项。
我还搜索了 SO 帖子和其他 Confluent 文档,看看是否能找到一些东西,但没有任何帮助
希望有人在这里了解更多?我愿意详细说明或澄清任何问题吗?如果可能的话,我试图不使用很多自定义逻辑来反序列化。
python - Kafka 偏移量手动提交不更新偏移量值
我从 Kafka 代理读取的 python 融合 kafka 代码如下所示
如上所见,提交前后的 msg.offset() 是相同的。我应该在执行 consumer.commit() 时也提交偏移值/分区还是我错过了什么
kerberos - Kerberos 身份验证窗口 | Confluent_kakfa python |尝试通过获取连接但获取失败初始化SASL身份验证错误
Kerberos 身份验证窗口 | Confluent_kakfa python |尝试通过获取连接但获取失败初始化SASL身份验证错误。我做了一些挖掘,结果发现 Windows 使用登录用户的凭据。请参阅下面的完整错误
任何解决方法,以便我们可以在这里使用 keytab?如果我没记错的话,这是正确的方法,对吧?
生产者配置:-
kerberos - 如何解决 No provider for SASL 机制 GSSAPI 的错误 Kafa Confluent Python
目前,我在使用 Kerberos 身份验证时遇到问题。我收到如下所述的错误
对此的任何帮助都非常感谢。谢谢!
python - confluent kafka producer avro schema error ClientError: Schema parse failed: Unknown named schema
我在 kafka 的生产者方面工作,以在主题中推送消息。我正在使用confluent-kafka
avro 生产者。
以下是我的架构.avsc
文件。
密钥.avsc
测试.avsc
生产者.py
当我尝试注册Keys.avsc
时,它工作正常,没有错误。Test.avsc
但是当我注册后尝试注册时Keys.avsc
。我得到以下错误。
confluent_kafka.avro.error.ClientError:模式解析失败:未知命名模式'io.codebrews.schema.test.Keys',已知名称:['io.codebrews.schema.test.Subscription']。
手动注册架构后。
在我的主题中推送消息时,出现以下错误。
ClientError: Incompatible Avro schema:409 message:{'error_code': 409, 'message': '正在注册的模式与主题“test-value”的早期模式不兼容。
我在这里做错了吗?
也有人可以帮助我如何在 python 中停止自动模式注册?
python - 如何在给定超时的kafka中查找消息
我有一个这样的测试场景:
- 将一些记录插入数据库
- 等待这些记录被 Kafka 处理
- 验证这些记录是否存在于 Kafka 主题中
第 2 步有时需要 5 秒。有时需要 2 分钟。
第一步返回一些我想在第 2 步(Kafka 主题)中查找的 ID。我想要实现的是检查这些 ID(从第一步开始)是否在给定的超时(例如 20 秒)内在 Kafka 中处理并继续执行其他步骤,例如:
从 Kafka 获取所有消息并检查这些 ID 是否在那里,如果没有,那么再过一个例如 20 秒。获取这些消息并检查 ID 是否在新获取的消息中。如果找到所有 ID,则返回这些消息的列表(在达到超时之前)或在达到超时之后 - 返回已获取消息的列表。
希望我解释得很好,因为它可能有点棘手,我想知道是否有可能完成它。
到目前为止,我只知道这样的事情。
有什么更好的方法吗?也许有线程,装饰器?我只是不知道如何检查列表中是否存在某些内容-如果是,则返回某些内容,如果不存在-再试一次,依此类推……直到达到超时。