问题标签 [confluent-kafka-python]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python-3.x - Confluent Kafka:为单个主题设置 Retention.ms 未按预期工作
我在我的项目中使用融合 Kafka,其中发送到特定主题的消息需要在保留时间后删除。所以我为各个主题设置了retention.ms,但它不起作用(在保留时间之后我仍然可以看到消息)
我浏览了大多数堆栈问题,但仍然无法找到 Kafka Retention.ms 不工作问题的正确原因/解决方案。
我按照以下步骤以毫秒为单位创建和设置保留时间。
创建了一个话题说。'用户状态'
按照下面的代码更新了它的retention.ms时间
从生产者端发送消息。
等待了 6000 毫秒的时间。
试图接收来自特定主题的消息。但是我收到了消息,并且消息仍然没有被保留策略删除。
注意: 我确保在更新retention.ms之后,我验证了在kafka主题信息(主题描述)中已经更新了相同的内容。
此外,我使用 log.retention.check.interval.ms=1 ms 更新了 server.properties,并在更新属性文件后重新启动了 Kafka 服务。
我对上述问题的期望
我想将retention.ms 设置为单个主题,并且应该按照Kafka策略中的定义自动删除传递该时间的消息。
我当前的代码现在发生了什么。 即使在保留时间之后,消费者仍会收到消息。
python - Python Azure Functions kafka 连接池
在 Azure 函数中,我们如何将 Kafka 生产者连接设为单例或连接池。每次触发函数时,都会创建一个新的 Kafka 连接。
python - 如何处理 confluent_kafka python 中的 ValueDeserializationError?
这是我用来创建新消费者的基本消费者类。它适用于"enable.auto.commit":True
消费者。但是,当我创建一个带有enable.auto.commit=False
任何 (KeyDeserializationError, ValueDeserializationError) 异常的消费者时,我需要在 except 块中手动提交该消息。由于这个基类也将用于 auto-commit=True,所以这条线 self.consumer.commit() 也被这些类型的消费者调用。
- 通过为消费者调用commit()在内部给出任何问题?
auto.commit=True
(当我在本地尝试时似乎很好) KeyDeserializationError
对 ( ,ValueDeserializationError
) 异常的理想处理应该是什么auto.commit=False
?
python - 如果在自动提交模式下运行的消费者调用 commit() 方法会发生什么?
我consumer.commit()
在 auto.commit=True 模式下运行的消费者的每条消息上调用该方法。当我在本地尝试它时,它没有任何数据丢失或数据重复。
该commit()
方法可以给消费者带来什么影响?
jdbc - Kafka/questDB JDBC Sink 连接器:未使用“topics.regex”创建的表
我使用“confluentinc/kafka-connect-jdbc:10.2.6”作为我的 JDBC 连接器将 Kafka 主题传输到我的 questDB 中。
当我提供明确的主题名称时,它按预期工作。但是,当我使用基于正则表达式的主题名称时,它不起作用,我的 questDB 数据库中没有创建表。
我的 JDBC 设置中缺少什么?
谢谢!
显式版本(工作):
正则表达式版本(不工作):
python - 如何在本地安装的 Kafka 中配置 TLS/SSL?
我正在尝试在 Kafka 中配置 SSL(在我的 Windows 本地安装)。我正在使用 confluent-kafka python 客户端。大多数解决方案都是针对 java 的,它涉及创建一些无法清楚理解的信任库、密钥库和 jass 配置。此外,我必须在属性文件(服务器/生产者/消费者)中进行哪些更改也不清楚。
这是生产者.py
这是消费者.py
我没有更改 server.properties 或 consumer/producer.properties 中的任何服务器配置。当我运行我的消费者时,我收到了这个错误:
对于生产者.py:
python - Fastavro schemaless_reader 无法使用最新模式反序列化消息
我正在使用 fastavro schemaless_reader 反序列化来自 Kafka 主题的 avro 消息。我注意到一个schema_latest
与上一条消息不兼容的问题,这不应该是这种情况,因为schema_latest
它只在以前的模式中添加了 2 个字段。
而且我可以使用schema_old
.
您能否帮助建议如何使用 fastavro 处理向后兼容的 avro 架构更改?非常感谢您的帮助!
python - confluent-kafka 超时 OffsetCommitRequest
我正在使用 confluent-kafka 1.8.2。面临一个问题,在 N 条消息之后,消费者因错误而崩溃。
GroupCoordinator/5:飞行中的 OffsetCommitRequest 超时(60165 毫秒后,超时 #0)
请帮助我了解问题所在以及如何解决。我使用具有以下参数的消费者:
更多日志
消费者组会话在 300005 毫秒后超时(处于稳定的加入状态),但组协调器没有成功响应(代理 5,最后一个错误是本地:队列中超时):撤销分配并重新加入组
GroupCoordinator/5:超时 0 个正在进行中,0 个重试队列,1 个队列外,0 个部分发送的请求
GroupCoordinator/5: 飞行中的 OffsetCommitRequest 超时(60000 毫秒后,超时 #0) GroupCoordinator/5:飞行中的 OffsetCommitRequest 超时(60000 毫秒后,超时 #1) GroupCoordinator/5:飞行中的 OffsetCommitRequest 超时(60000 毫秒后,超时 #2 ) GroupCoordinator/5: 飞行中的 OffsetCommitRequest 超时(60000 毫秒后,超时 #3) GroupCoordinator/5:飞行中的 OffsetCommitRequest 超时(60000 毫秒后,超时 #4) GroupCoordinator/5:8301 飞行中超时,0 重试队列, 47924 出队, 1 部分发送的请求
python - Confluent-Kafka python delete_topic被重新创建
当我尝试使用 Confluent-Kafka AdminClient Delete_topic 删除 Kafka 主题时,该主题暂时被删除,但几秒钟后会重新创建。
我尝试了Producer.flush() , Consumer.unsubscribe()以避免重新创建,但仍然无法弄清楚为什么要重新创建它。
但是,当我尝试在整个过程完成后使用单个 python 脚本手动删除同一主题时,它会被删除。
avro - AvroSerializer:订单簿快照的架构
我有一个 Kafka 集群正在运行,我想将 L2-orderbook 快照存储到一个主题中,该主题有一个 {key:value} 对字典,其中键的类型为 float,如下例所示:
我下面的架构建议不起作用,我很确定这是因为“出价”和“询问”字典中的键不是字符串类型。
KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="no value and no default for bids"}
什么是适当的 avro 模式?