问题标签 [confluent-kafka-python]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - 使用 Structlog 获取最后一个日志值并将其作为变量传递给函数
目前我正在尝试实现一个函数调用,该函数调用将失败的消息从转换器发送到带有 Kafka 的 DLQ 主题。作为 DLQ 消息的一部分,我想包含我们也记录的异常错误。
编码:
我需要获取最新 log.error() 调用的值并将其分配给变量:error_message
我在另一个异常块中调用了这个完全相同的函数,但它在 log.error() 调用中具有不同的值,所以我需要一些能够获取最后/最新错误消息的东西。
pykafka - confluent-kafka-python 库:每个消费者组每个主题的读取偏移量
由于pykafka EOL,我们正在迁移到confluent-kafka-python。因为pykafka
我们编写了一个详细的脚本,它以以下格式生成输出:
话题 | 消费群体 | 抵消 |
---|---|---|
topic_alpha | total_messages | 100 |
topic_alpha | 消费者_a | 10 |
topic_alpha | 消费者_b | 25 |
我想知道是否有一个 Python 代码知道如何为confluent-kafka-python
?
小字:有一个关于如何读取每个给定 consumer_group 的偏移量的部分示例。但是,我很难在consumer_group
不手动解析的情况下获取每个主题的列表__consumer_offsets
。
python - 汇合模式注册表 - AvroProducer - 连接错误
我正在尝试使用 AvroProducer 的confluent_kafka类以 Avro 格式生成消息。Kafka 和 Schema-Registry 在同一网络中作为 3 个节点的集群运行。
我正在从字符串中读取模式并初始化 AvroProducer,如下所示:
schema-registry 的监听器在 schema 注册服务器端设置为 http://localhost:8081,即 broker1。
然后尝试使用下面的代码发送消息
我得到的是
我没有使用 Docker 容器。集群由 3 个独立的 VM 组成,其中安装并运行 Kafka 和 Registry Schema,因此它也不是独立的。Python 代码从具有网络访问权限和防火墙例外的第四个 VM 执行。事实上,我能够在没有 avro 和注册表模式的情况下生成和使用消息,所以我不认为这个问题与网络有关,但我对想法持开放态度。当我尝试将 Avro 与 Registry Schema 一起使用时,出现上述错误。此外,错误指出注册表模式端口 8081 所以问题应该与它有关,但我不知道还要寻找什么。
avro - confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161 : ValueError
我是 python 的新手,并试图使用 'confluent_kafka' 来生成 avro 消息。使用 'confluent_kafka.schema_registry.avro.AvroSerializer' 相同(参考:https ://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro_producer.py )
它适用于带有 dict(json 转换为 dict) 输入的简单 avro 模式,但对于以下示例模式,我收到错误:
架构:
输入 JSON :
错误 :
ValueError: {'CoreOLTPEvents.dbo.Event.Value': {'EventId': 1111111111, 'CameraId': 222222222}} (type <class 'dict'>) do not match ['null', {'connect.name': 'CoreOLTPEvents.dbo.Event.Value', 'type': 'record', 'name': 'CoreOLTPEvents.dbo.Event.Value', 'fields': [{'name': 'EventId', 'type': 'long'}, {'default': None, 'name': 'CameraId', 'type': ['null', 'long']}]}] on field before
'before' 字段类型是联合(['null',record]),如果我将其更改为仅记录(删除联合),那么它可以正常工作。但是我需要调整我的输入,使其适用于给定的模式。
(注意:我正在使用 'json.load(json_file)' 读取 json 输入,因此它提供了 dict 输出)
任何帮助将非常感激。
更新:实际大架构:
大型模式的输入:
错误 :
confluent-kafka-python - 融合 kakfa 生产者 KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
我是 Kafka 新手,使用 confluent kafka 并尝试使用带有 'sasl.mechanism': 'PLAIN','security.protocol': 'SASL_SSL' 的 python 生产者代码从 AWS EC2 实例向现有 kafka 主题写入消息。我试过了此链接中的生产者示例。出现以下错误。如果有任何可以指导我导致此问题的原因将有很大帮助。
apache-kafka - 使用 Kafka-Python 共享文件夹/文件
- 我正在整合两个项目,我正在使用 Kafka 作为它们之间的通信渠道
- 我可以从生产者(项目 A)向项目 B 上的消费者发送数据文件(CSV 格式)和数据帧(从 influxdb 中提取)
- 但是如何通过 Kafka 发送非简单文本的目录或其他文件(如 ai/ml 模型)
python - 如何在 avro 序列化期间在 Python 中设置具有十进制值比例和精度的 avro 'bytes' 字段值?
我正在尝试使用我拥有的模式向 kafka 生成一条 avro 消息。(使用 confluent-kafka python 包生产者)
生产者工作正常,除了“字节”字段值,它没有在消费者端正确反序列化。这些“字节”字段的值实际上是十进制值,必须设置比例和精度。
我可以在 Scala 中做到这一点,下面是 Scala 的代码,我正在寻找 Python。
架构(仅特定字段):
斯卡拉实现:
谢谢
confluent-schema-registry - 添加新字段时的架构演变
想象有分开的应用程序:生产者和消费者。
生产者代码:
basic_schema.avsc文件位于生产者应用程序中。其内容:
目前,消费者内部的内容并不重要。
我们运行生产者一次,一切正常。然后我想添加年龄字段:
basic_schema.avsc:
在这里我得到了错误:
confluent_kafka.avro.error.ClientError:不兼容的 Avro 架构:409
他们在这里说https://docs.confluent.io/platform/current/schema-registry/avro.html#summary对于兼容性类型 == BACKWARD 消费者应该首先更新。
我无法从技术上理解。我的意思是我必须将basic_schema.avsc文件复制到消费者并运行它吗?
python-3.x - confluent-kafka 消费者使用 asyncio 时遇到问题
我正在尝试将asyncio
功能集成到我的 kafka 主题侦听器中,并且遇到了问题(python 中的异步编程非常新)。
我有一个confluent-kafka consumer
正在听一个主题的创建。该主题经常有消息,性能是最重要的(因此引入了异步 io)。
main() 函数如下所示:
本质上,我想以线性方式从主题中提取消息,但是消息的处理应该是异步的……这意味着发生的任何数据库 I/Ohandle_message
等都将被异步处理(我设置了等待等)在该功能中正确)。问题是我似乎从未在 asyncio.ensure_future() 中开始执行。 当我从 kafka 主题中提取消息时,如何不断地将任务添加到异步循环中? 使用confluent-kafka==1.5.0
confluent-kafka-python - 不同的设备ID到不同的分区
我有equipment_id
那个可以1, 5, 7 or 10
,我希望它们被放置在不同的分区中。我怎样才能做到这一点?我是否必须首先创建适当数量的分区(在本例中为 4 个)?如果是这样,下一步是什么?
方法中有partition
参数produce()
。但它接受分区的序列号:0, 1, 2, 3
. 如果我str(equipment_id)
为key
参数定义它总是将数据发送到分区 0(根据输出delivery_callback()
)。