问题标签 [confluent-kafka-python]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
69 浏览

python - 在 Kafka Python 中设置分区键表达式配置

我正在使用confluent 的 kafka python 包。我想在 Spring (Java) 中向 Producer 添加一个名为partition-key-expression的配置属性(有关更多信息,请参阅此参考

我现在实例化生产者的方式如下:

我想知道是否可以添加partition-key-expression配置属性,因为我在文档中找不到它。

0 投票
1 回答
59 浏览

python - 尝试初始化kafka生产者时没有这样的配置属性:“schema.compatibility.level”

我正在使用融合卡夫卡。

我的代码是

运行这个时,我得到

即使使用旧avro.compatibility.level参数也会引发相同的错误

它适用于其他参数,例如 "acks": "all","debug": "msg"但不适用于兼容性级别。

请帮我解决这个问题。旧模式有很多变化,既不向前也不向后兼容。我需要将其设置为 None 。请帮忙。!

0 投票
2 回答
964 浏览

python - 无法从 confluent_kafka 导入生产者

我正在使用带有 python 3.9 和 confluent_kafka 库的 conda 环境(通过 pip install confluent-kafka 安装)。我还安装了 librdkafka。从 pycharm 我无法导入:

我收到此错误:

你能帮忙解决这个问题吗?

0 投票
0 回答
12 浏览

python-3.x - 尝试从远程主机向我的 kafka 主题发布消息

我在 Django 框架中使用 confluent-Kafka lib 来创建和使用我的 Kafka 客户端。我的 Kafka 代理当前正在 docker 容器内的远程 ESXI 机器 (Linux) 上运行。我也在使用 docker-compose 来构建和运行容器。

当在 ESXI 上运行相同的代码(主机和端口的 env 设置不同)时,我可以使用我的 kafka 客户端发布/读取消息,但是在我的本地 Windows 主机上运行我的 Django 服务器时,我' m 得到下一个错误:

链接到错误的图像

这是我的 docker-compose 示例文件:

0 投票
1 回答
436 浏览

python - 在 SchemaRegistryClient Confluent Kafka 中禁用证书验证

所以,我想从 kafka (Confluent) 中阅读一个主题,其中数据位于 Avro 格式。

由于某些不可避免的原因,我想禁用证书验证。

我正在使用 security.protocol= SASL_SSL 和 sasl.mechanisms= OUTHBEARER

我可以通过禁用 ssl 证书验证来连接到 Kafka

现在,我在尝试使用 Schema 注册表反序列化值时遇到了问题。avro 解串器需要模式注册表客户端和模式(可选)。我两个都过去了。我通过使用 verify=False 发出单独的请求来获取第二个参数的值以获取架构,这很好。但是当我尝试从中创建一个反序列化消费者时,问题就出现了。

基本上是代码的骨架(以及问题所在的注释)

您在上面看到的 get_basic_configuration 方法有

错误是

我在这里浏览了 SchemaRegistryClient 的代码, 但我没有看到任何将证书验证设置为 false 的选项。

我还搜索了 SO 帖子和其他 Confluent 文档,看看是否能找到一些东西,但没有任何帮助

希望有人在这里了解更多?我愿意详细说明或澄清任何问题吗?如果可能的话,我试图不使用很多自定义逻辑来反序列化。

0 投票
1 回答
111 浏览

python - Kafka 偏移量手动提交不更新偏移量值

我从 Kafka 代理读取的 python 融合 kafka 代码如下所示

如上所见,提交前后的 msg.offset() 是相同的。我应该在执行 consumer.commit() 时也提交偏移值/分区还是我错过了什么

0 投票
0 回答
207 浏览

kerberos - Kerberos 身份验证窗口 | Confluent_kakfa python |尝试通过获取连接但获取失败初始化SASL身份验证错误

Kerberos 身份验证窗口 | Confluent_kakfa python |尝试通过获取连接但获取失败初始化SASL身份验证错误。我做了一些挖掘,结果发现 Windows 使用登录用户的凭据。请参阅下面的完整错误

任何解决方法,以便我们可以在这里使用 keytab?如果我没记错的话,这是正确的方法,对吧?

生产者配置:-

0 投票
0 回答
117 浏览

kerberos - 如何解决 No provider for SASL 机制 GSSAPI 的错误 Kafa Confluent Python

目前,我在使用 Kerberos 身份验证时遇到问题。我收到如下所述的错误

对此的任何帮助都非常感谢。谢谢!

0 投票
1 回答
303 浏览

python - confluent kafka producer avro schema error ClientError: Schema parse failed: Unknown named schema

我在 kafka 的生产者方面工作,以在主题中推送消息。我正在使用confluent-kafkaavro 生产者。

github上喜欢的问题

以下是我的架构.avsc文件。

密钥.avsc

测试.avsc

生产者.py

当我尝试注册Keys.avsc时,它工作正常,没有错误。Test.avsc但是当我注册后尝试注册时Keys.avsc。我得到以下错误。

confluent_kafka.avro.error.ClientError:模式解析失败:未知命名模式'io.codebrews.schema.test.Keys',已知名称:['io.codebrews.schema.test.Subscription']。

手动注册架构后。

在我的主题中推送消息时,出现以下错误。

ClientError: Incompatible Avro schema:409 message:{'error_code': 409, 'message': '正在注册的模式与主题“test-value”的早期模式不兼容。

我在这里做错了吗?

也有人可以帮助我如何在 python 中停止自动模式注册?

0 投票
0 回答
35 浏览

python - 如何在给定超时的kafka中查找消息

我有一个这样的测试场景:

  1. 将一些记录插入数据库
  2. 等待这些记录被 Kafka 处理
  3. 验证这些记录是否存在于 Kafka 主题中

第 2 步有时需要 5 秒。有时需要 2 分钟。
第一步返回一些我想在第 2 步(Kafka 主题)中查找的 ID。我想要实现的是检查这些 ID(从第一步开始)是否在给定的超时(例如 20 秒)内在 Kafka 中处理并继续执行其他步骤,例如:
从 Kafka 获取所有消息并检查这些 ID 是否在那里,如果没有,那么再过一个例如 20 秒。获取这些消息并检查 ID 是否在新获取的消息中。如果找到所有 ID,则返回这些消息的列表(在达到超时之前)或在达到超时之后 - 返回已获取消息的列表。

希望我解释得很好,因为它可能有点棘手,我想知道是否有可能完成它。

到目前为止,我只知道这样的事情。

有什么更好的方法吗?也许有线程,装饰器?我只是不知道如何检查列表中是否存在某些内容-如果是,则返回某些内容,如果不存在-再试一次,依此类推……直到达到超时。