问题标签 [confluent-kafka-python]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
32 浏览

python - 使用 Structlog 获取最后一个日志值并将其作为变量传递给函数

目前我正在尝试实现一个函数调用,该函数调用将失败的消息从转换器发送到带有 Kafka 的 DLQ 主题。作为 DLQ 消息的一部分,我想包含我们也记录的异常错误。

编码:

我需要获取最新 log.error() 调用的值并将其分配给变量:error_message

我在另一个异常块中调用了这个完全相同的函数,但它在 log.error() 调用中具有不同的值,所以我需要一些能够获取最后/最新错误消息的东西。

0 投票
1 回答
375 浏览

pykafka - confluent-kafka-python 库:每个消费者组每个主题的读取偏移量

由于pykafka EOL,我们正在迁移到confluent-kafka-python。因为pykafka我们编写了一个详细的脚本,它以以下格式生成输出:

话题 消费群体 抵消
topic_alpha total_messages 100
topic_alpha 消费者_a 10
topic_alpha 消费者_b 25

我想知道是否有一个 Python 代码知道如何为confluent-kafka-python?

小字:有一个关于如何读取每个给定 consumer_group 的偏移量的部分示例。但是,我很难在consumer_group不手动解析的情况下获取每个主题的列表__consumer_offsets

0 投票
1 回答
423 浏览

python - 汇合模式注册表 - AvroProducer - 连接错误

我正在尝试使用 AvroProducer 的confluent_kafka类以 Avro 格式生成消息。Kafka 和 Schema-Registry 在同一网络中作为 3 个节点的集群运行。

我正在从字符串中读取模式并初始化 AvroProducer,如下所示:

schema-registry 的监听器在 schema 注册服务器端设置为 http://localhost:8081,即 broker1。

然后尝试使用下面的代码发送消息

我得到的是

我没有使用 Docker 容器。集群由 3 个独立的 VM 组成,其中安装并运行 Kafka 和 Registry Schema,因此它也不是独立的。Python 代码从具有网络访问权限和防火墙例外的第四个 VM 执行。事实上,我能够在没有 avro 和注册表模式的情况下生成和使用消息,所以我不认为这个问题与网络有关,但我对想法持开放态度。当我尝试将 Avro 与 Registry Schema 一起使用时,出现上述错误。此外,错误指出注册表模式端口 8081 所以问题应该与它有关,但我不知道还要寻找什么。

0 投票
1 回答
1467 浏览

avro - confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161 : ValueError

我是 python 的新手,并试图使用 'confluent_kafka' 来生成 avro 消息。使用 'confluent_kafka.schema_registry.avro.AvroSerializer' 相同(参考:https ://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro_producer.py )

它适用于带有 dict(json 转换为 dict) 输入的简单 avro 模式,但对于以下示例模式,我收到错误:

架构:

输入 JSON :

错误 : ValueError: {'CoreOLTPEvents.dbo.Event.Value': {'EventId': 1111111111, 'CameraId': 222222222}} (type <class 'dict'>) do not match ['null', {'connect.name': 'CoreOLTPEvents.dbo.Event.Value', 'type': 'record', 'name': 'CoreOLTPEvents.dbo.Event.Value', 'fields': [{'name': 'EventId', 'type': 'long'}, {'default': None, 'name': 'CameraId', 'type': ['null', 'long']}]}] on field before

'before' 字段类型是联合(['null',record]),如果我将其更改为仅记录(删除联合),那么它可以正常工作。但是我需要调整我的输入,使其适用于给定的模式。

(注意:我正在使用 'json.load(json_file)' 读取 json 输入,因此它提供了 dict 输出)

任何帮助将非常感激。

更新:实际大架构:

大型模式的输入:

错误 :

0 投票
0 回答
2167 浏览

confluent-kafka-python - 融合 kakfa 生产者 KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}

我是 Kafka 新手,使用 confluent kafka 并尝试使用带有 'sasl.mechanism': 'PLAIN','security.protocol': 'SASL_SSL' 的 python 生产者代码从 AWS EC2 实例向现有 kafka 主题写入消息。我试过了此链接中的生产者示例。出现以下错误。如果有任何可以指导我导致此问题的原因将有很大帮助。

0 投票
0 回答
27 浏览

apache-kafka - 使用 Kafka-Python 共享文件夹/文件

  • 我正在整合两个项目,我正在使用 Kafka 作为它们之间的通信渠道
  • 我可以从生产者(项目 A)向项目 B 上的消费者发送数据文件(CSV 格式)和数据帧(从 influxdb 中提取)
  • 但是如何通过 Kafka 发送非简单文本的目录或其他文件(如 ai/ml 模型)
0 投票
0 回答
270 浏览

python - 如何在 avro 序列化期间在 Python 中设置具有十进制值比例和精度的 avro 'bytes' 字段值?

我正在尝试使用我拥有的模式向 kafka 生成一条 avro 消息。(使用 confluent-kafka python 包生产者)

生产者工作正常,除了“字节”字段值,它没有在消费者端正确反序列化。这些“字节”字段的值实际上是十进制值,必须设置比例和精度。

我可以在 Scala 中做到这一点,下面是 Scala 的代码,我正在寻找 Python。

架构(仅特定字段):

斯卡拉实现:

谢谢

0 投票
1 回答
162 浏览

confluent-schema-registry - 添加新字段时的架构演变

想象有分开的应用程序:生产者和消费者。

生产者代码:

basic_schema.avsc文件位于生产者应用程序中。其内容:

目前,消费者内部的内容并不重要。

我们运行生产者一次,一切正常。然后我想添加年龄字段:

basic_schema.avsc

在这里我得到了错误:

confluent_kafka.avro.error.ClientError:不兼容的 Avro 架构:409

他们在这里说https://docs.confluent.io/platform/current/schema-registry/avro.html#summary对于兼容性类型 == BACKWARD 消费者应该首先更新。

我无法从技术上理解。我的意思是我必须将basic_schema.avsc文件复制到消费者并运行它吗?

0 投票
0 回答
595 浏览

python-3.x - confluent-kafka 消费者使用 asyncio 时遇到问题

我正在尝试将asyncio功能集成到我的 kafka 主题侦听器中,并且遇到了问题(python 中的异步编程非常新)。

我有一个confluent-kafka consumer正在听一个主题的创建。该主题经常有消息,性能是最重要的(因此引入了异步 io)。

main() 函数如下所示:

本质上,我想以线性方式从主题中提取消息,但是消息的处理应该是异步的……这意味着发生的任何数据库 I/Ohandle_message等都将被异步处理(我设置了等待等)在该功能中正确)。问题是我似乎从未在 asyncio.ensure_future() 中开始执行。 当我从 kafka 主题中提取消息时,如何不断地将任务添加到异步循环中? 使用confluent-kafka==1.5.0

0 投票
0 回答
5 浏览

confluent-kafka-python - 不同的设备ID到不同的分区

我有equipment_id那个可以1, 5, 7 or 10,我希望它们被放置在不同的分区中。我怎样才能做到这一点?我是否必须首先创建适当数量的分区(在本例中为 4 个)?如果是这样,下一步是什么?

方法中有partition参数produce()。但它接受分区的序列号:0, 1, 2, 3. 如果我str(equipment_id)key参数定义它总是将数据发送到分区 0(根据输出delivery_callback())。