问题标签 [confluent-kafka-python]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
90 浏览

python-3.x - Confluent Kafka:为单个主题设置 Retention.ms 未按预期工作

我在我的项目中使用融合 Kafka,其中发送到特定主题的消息需要在保留时间后删除。所以我为各个主题设置了retention.ms,但它不起作用(在保留时间之后我仍然可以看到消息)

我浏览了大多数堆栈问题,但仍然无法找到 Kafka Retention.ms 不工作问题的正确原因/解决方案。

我按照以下步骤以毫秒为单位创建和设置保留时间。

  1. 创建了一个话题说。'用户状态'

  2. 按照下面的代码更新了它的retention.ms时间

  3. 从生产者端发送消息。

  4. 等待了 6000 毫秒的时间。

  5. 试图接收来自特定主题的消息。但是我收到了消息,并且消息仍然没有被保留策略删除。

注意: 我确保在更新retention.ms之后,我验证了在kafka主题信息(主题描述)中已经更新了相同的内容。

此外,我使用 log.retention.check.interval.ms=1 ms 更新了 server.properties,并在更新属性文件后重新启动了 Kafka 服务。

我对上述问题的期望

我想将retention.ms 设置为单个主题,并且应该按照Kafka策略中的定义自动删除传递该时间的消息。

我当前的代码现在发生了什么。 即使在保留时间之后,消费者仍会收到消息。

0 投票
0 回答
48 浏览

python - Python Azure Functions kafka 连接池

在 Azure 函数中,我们如何将 Kafka 生产者连接设为单例或连接池。每次触发函数时,都会创建一个新的 Kafka 连接。

0 投票
0 回答
103 浏览

python - 如何处理 confluent_kafka python 中的 ValueDeserializationError?

这是我用来创建新消费者的基本消费者类。它适用于"enable.auto.commit":True消费者。但是,当我创建一个带有enable.auto.commit=False任何 (KeyDeserializationError, ValueDeserializationError) 异常的消费者时,我需要在 except 块中手动提交该消息。由于这个基类也将用于 auto-commit=True,所以这条线 self.consumer.commit() 也被这些类型的消费者调用。

  1. 通过为消费者调用commit()在内部给出任何问题?auto.commit=True(当我在本地尝试时似乎很好)
  2. KeyDeserializationError对 ( , ValueDeserializationError) 异常的理想处理应该是什么auto.commit=False
0 投票
0 回答
24 浏览

python - 如果在自动提交模式下运行的消费者调用 commit() 方法会发生什么?

consumer.commit()在 auto.commit=True 模式下运行的消费者的每条消息上调用该方法。当我在本地尝试它时,它没有任何数据丢失或数据重复。

commit()方法可以给消费者带来什么影响?

0 投票
2 回答
97 浏览

jdbc - Kafka/questDB JDBC Sink 连接器:未使用“topics.regex”创建的表

我使用“confluentinc/kafka-connect-jdbc:10.2.6”作为我的 JDBC 连接器将 Kafka 主题传输到我的 questDB 中。

当我提供明确的主题名称时,它按预期工作。但是,当我使用基于正则表达式的主题名称时,它不起作用,我的 questDB 数据库中没有创建表。

我的 JDBC 设置中缺少什么?

谢谢!

显式版本(工作):

正则表达式版本(不工作):

0 投票
0 回答
80 浏览

python - 如何在本地安装的 Kafka 中配置 TLS/SSL?

我正在尝试在 Kafka 中配置 SSL(在我的 Windows 本地安装)。我正在使用 confluent-kafka python 客户端。大多数解决方案都是针对 java 的,它涉及创建一些无法清楚理解的信任库、密钥库和 jass 配置。此外,我必须在属性文件(服务器/生产者/消费者)中进行哪些更改也不清楚。

这是生产者.py

这是消费者.py

我没有更改 server.properties 或 consumer/producer.properties 中的任何服务器配置。当我运行我的消费者时,我收到了这个错误:

对于生产者.py:

0 投票
0 回答
34 浏览

python - Fastavro schemaless_reader 无法使用最新模式反序列化消息

我正在使用 fastavro schemaless_reader 反序列化来自 Kafka 主题的 avro 消息。我注意到一个schema_latest与上一条消息不兼容的问题,这不应该是这种情况,因为schema_latest它只在以前的模式中添加了 2 个字段。

而且我可以使用schema_old.

您能否帮助建议如何使用 fastavro 处理向后兼容的 avro 架构更改?非常感谢您的帮助!

0 投票
0 回答
86 浏览

python - confluent-kafka 超时 OffsetCommitRequest

我正在使用 confluent-kafka 1.8.2。面临一个问题,在 N 条消息之后,消费者因错误而崩溃。

GroupCoordinator/5:飞行中的 OffsetCommitRequest 超时(60165 毫秒后,超时 #0)

请帮助我了解问题所在以及如何解决。我使用具有以下参数的消费者:

更多日志

消费者组会话在 300005 毫秒后超时(处于稳定的加入状态),但组协调器没有成功响应(代理 5,最后一个错误是本地:队列中超时):撤销分配并重新加入组

GroupCoordinator/5:超时 0 个正在进行中,0 个重试队列,1 个队列外,0 个部分发送的请求

GroupCoordinator/5: 飞行中的 OffsetCommitRequest 超时(60000 毫秒后,超时 #0) GroupCoordinator/5:飞行中的 OffsetCommitRequest 超时(60000 毫秒后,超时 #1) GroupCoordinator/5:飞行中的 OffsetCommitRequest 超时(60000 毫秒后,超时 #2 ) GroupCoordinator/5: 飞行中的 OffsetCommitRequest 超时(60000 毫秒后,超时 #3) GroupCoordinator/5:飞行中的 OffsetCommitRequest 超时(60000 毫秒后,超时 #4) GroupCoordinator/5:8301 飞行中超时,0 重试队列, 47924 出队, 1 部分发送的请求

0 投票
0 回答
29 浏览

python - Confluent-Kafka python delete_topic被重新创建

当我尝试使用 Confluent-Kafka AdminClient Delete_topic 删除 Kafka 主题时,该主题暂时被删除,但几秒钟后会重新创建。

我尝试了Producer.flush() , Consumer.unsubscribe()以避免重新创建,但仍然无法弄清楚为什么要重新创建它。

但是,当我尝试在整个过程完成后使用单个 python 脚本手动删除同一主题时,它会被删除。

0 投票
2 回答
52 浏览

avro - AvroSerializer:订单簿快照的架构

我有一个 Kafka 集群正在运行,我想将 L2-orderbook 快照存储到一个主题中,该主题有一个 {key:value} 对字典,其中键的类型为 float,如下例所示:

我下面的架构建议不起作用,我很确定这是因为“出价”和“询问”字典中的键不是字符串类型。

KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="no value and no default for bids"}

什么是适当的 avro 模式?