问题标签 [confluent-kafka-python]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
135 浏览

python - 如何在消费前获取架构?

我使用带有模式注册表的 python confluent-kafka 1.5.0 来使用来自 kafka 的 avro 消息。
我只是消费者,无法访问管理生产者或经纪人或其他东西。
我知道主题名称,并且从消息字段中我也在模式注册表客户端中获取主题和名称空间,sr 客户端使用消息中的 schema_id 获取模式。
根据我的目标,在我开始消费之前,我需要方法获取模式(仅 id 或主题名称)。也就是说,我想手动按主题名称获取架构

请仔细阅读,我知道从消息中获取 schema_id 后如何手动获取模式。

0 投票
0 回答
293 浏览

python - 无法捕获 confluent-kafka-python 超时错误

我的confluent-kafka-python- 版本 1.7.0 生产者代码如下

在生成代码时,我在 Kafka 代理中收到以下错误

我想捕获 REQTMOUT 错误并在错误时返回 False。但无法做同样的事情。任何人都可以帮助我。

0 投票
0 回答
64 浏览

python - KafkaError FATAL,代码 INVALID_PRODUCER_EPOCH 47 无法将分区添加到事务:代理:生产者尝试使用旧纪元进行操作“

这是关于:

并调用:

实际回溯:

如何重现此错误、对其进行调试或识别并掌握如何在本地进行调试?

0 投票
0 回答
27 浏览

python - 如何计算两个偏移量之间的 Kafka 消息数?

我需要使用 confluent_kafka python 包计算两个偏移量之间的消息数。我之前假设消息的数量将是偏移量之间的差异(即结束 - 开始),但显然 kafka 有时会将偏移量增加 >1,从而使上述不准确。

给定单个分区中的两个偏移量 offset_start 和 offset_end,我如何计算它们之间的消息数?

0 投票
0 回答
57 浏览

python - Confluent-kafka 消费者未阅读最后发布的消息

我有一个 confluent-kafka 消费者,它将每天运行一次。在它运行时,我只想从该主题中获取最后一条消息。我有以下代码,但没有获取最后一条消息,只是读取最后一条消息以外的消息。我的 confluent-kafka 消费者代码如下所示:

其中 eventId 是最后发布的消息上的值。我需要包含任何参数还是我在配置中遗漏了什么?任何帮助,将不胜感激。

0 投票
1 回答
131 浏览

python - 没有收到来自 Kafka 主题的消息

在此程序中调用 poll() 时我收到 None 但从 cmd 运行 kafka-console-consumer.bat 时收到消息,我无法弄清楚到底是什么问题。

执行从 main.py 开始

KafkaDiscoveryExecutor 类用于使用共享队列中的消息并处理该消息。

这是 kafka_message_consumer.py

指定的主题有事件,但我在这里得到 None 并且“if msg is None:”中的打印语句正在执行。

0 投票
0 回答
50 浏览

confluent-kafka-python - confluent-kafka-python 中的消费者偏移量

卡夫卡新手。使用 confluent-kafka python 库通过手动提交处理消费者代码。以下输出中的 offset=%s 是什么意思

0 投票
0 回答
52 浏览

python - 在Kafka中将整个消息从一个队列路由到另一个队列

我正在建立一个死信队列。我感兴趣的实现是将消息全部转发到另一个队列的能力(例如,当出现错误时)。换句话说,我对保留消息的元数据及其内容很感兴趣。

这里有一些代码来突出这个问题:

但是,我无法发送此消息。生成消息时,将消息转换为 json 会返回以下内容: TypeError: Object of type Message is not JSON serializable. 如果我尝试序列化为 JSON - 那么我似乎无法使用 json 获得消息的完整表示。

我正在使用融合的 kafka pytrhon。这是消费者 - https://github.com/confluentinc/confluent-kafka-python/blob/a5663da7ea76e58d02b13e4e6703ea6a9c52ec11/src/confluent_kafka/src/Consumer.c。生产者 - https://github.com/confluentinc/confluent-kafka-python/blob/a5663da7ea76e58d02b13e4e6703ea6a9c52ec11/src/confluent_kafka/src/Producer.c

在包含消息元数据的同时,我如何能够将消息从一个队列转发到另一个队列?

0 投票
0 回答
90 浏览

python - Python confluent-kafka 包返回返回“请求 ApiVersion 时断开连接”

环境操作系统:macOS

python库confluent_kafka==1.7.0

外部 kafka 代理版本 2.6.1

不指定任何安全协议,一切都是默认的 PLAINTEXT

运行这个

它不起作用,返回错误“请求 ApiVersion 时断开连接”

厌倦了使用单独的工具,如Kafka 工具和另一个 python 包,如kafka-python - 它正在工作,但 confluent_kafka 没有。

调试信息

%7|1637327168.507|经纪人|rdkafka#consumer-1| [thrd:app]:GroupCoordinator:添加了 NodeId -1 %7|1637327168.507|BROKER|rdkafka#consumer-1| 的新代理 [thrd:app]: my-external-host:9092/bootstrap: 添加了 NodeId -1 %7|1637327168.507|BRKMAIN|rdkafka#consumer-1| 的新代理 [thrd:GroupCoordinator]: GroupCoordinator: 进入主代理线程 %7|1637327168.507|BRKMAIN|rdkafka#consumer-1| [thrd::0/internal]: :0/internal: 进入主代理线程 %7|1637327168.507|INIT|rdkafka#consumer-1| [thrd:app]: librdkafka v1.7.0 (0x10700ff) rdkafka#consumer-1 已初始化 (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,STRIP STATIC_LINKING CC GXX PKGCONFIG OSXLD LIBDL 插件 ZLIB SSL SASL_CYRUS ZSTD HDRHISTOGRAM SNAPPY SOCKEM SASL_SCRAM SASL_OUTHBEARER CRC32C_HW,调试 0x2) %7|1637327168。507|连接|rdkafka#consumer-1| [thrd:main]: my-external-host:9092/bootstrap: 选择用于集群连接:协调器查询(代理有 0 次连接尝试)%7|1637327168.507|BRKMAIN|rdkafka#consumer-1| [thrd:my-external-host:9092/bootstrap]: my-external-host:9092/bootstrap: 进入主代理线程 %7|1637327168.507|CONNECT|rdkafka#consumer-1| [thrd:my-external-host:9092/bootstrap]: my-external-host:9092/bootstrap: 收到 CONNECT op %7|1637327168.507|STATE|rdkafka#consumer-1| [thrd:my-external-host:9092/bootstrap]: my-external-host:9092/bootstrap: 代理更改状态 INIT -> TRY_CONNECT %7|1637327168.507|CONNECT|rdkafka#consumer-1| [thrd:app]:没有为集群连接选择任何代理:仍然抑制 49 毫秒:应用程序元数据请求 %7|1637327168.507|CONNECT|rdkafka#consumer-1| [thrd:应用程序]:未为集群连接选择任何代理:仍被抑制 49 毫秒:应用程序元数据请求 %7|1637327168.507|CONNECT|rdkafka#consumer-1| [thrd:my-external-host:9092/bootstrap]: my-external-host:9092/bootstrap: 代理处于状态 TRY_CONNECT 连接 %7|1637327168.507|STATE|rdkafka#consumer-1| [thrd:my-external-host:9092/bootstrap]: my-external-host:9092/bootstrap: 代理更改状态 TRY_CONNECT -> CONNECT %7|1637327168.507|CONNECT|rdkafka#consumer-1| [thrd:app]:没有为集群连接选择任何代理:仍然抑制 49 毫秒:应用程序元数据请求 %7|1637327168.509|CONNECT|rdkafka#consumer-1| [thrd:my-external-host:9092/bootstrap]: my-external-host:9092/bootstrap: 使用套接字 11 %7|1637327168.578|CONNECT|rdkafka#consumer- 连接到 ipv4#10.103.68.24:9092(纯文本) 1| [thrd:my-external-host:9092/bootstrap]: my-external-host:9092/bootstrap: 连接到 ipv4#10.103.68.24:9092 %7|1637327168.578|CONNECTED|rdkafka#consumer-1| [thrd:my-external-host:9092/bootstrap]: my-external-host:9092/bootstrap: 已连接 (#1) %7|1637327168.578|FEATURE|rdkafka#consumer-1| [thrd:my-external-host:9092/bootstrap]:my-external-host:9092/bootstrap:将启用的协议功能 +ApiVersion 更新为 ApiVersion %7|1637327168.578|STATE|rdkafka#consumer-1| [thrd:my-external-host:9092/bootstrap]: my-external-host:9092/bootstrap: 代理更改状态 CONNECT -> APIVERSION_QUERY %7|1637327168.578|CONNECT|rdkafka#consumer-1| [thrd:app]:集群连接已在进行中:应用程序元数据请求 %7|1637327168.580|FAIL|rdkafka#consumer-1| [thrd:my-external-host:9092/bootstrap]:my-external-host:9092/bootstrap:请求 ApiVersion 时断开连接:可能是由于错误的 security.protocol 配置(连接到 SSL 侦听器?)或代理版本 < 0.10(请参阅 api.version.request)(在状态 APIVERSION_QUERY 2 毫秒后)(_TRANSPORT)%6|1637327168.580|FAIL|rdkafka#消费者-1| [thrd:my-external-host:9092/bootstrap]: my-external-host:9092/bootstrap: Disconnected while requesting ApiVersion: 可能是由于不正确的 security.protocol 配置(连接到 SSL 侦听器?)或代理版本是< 0.10(参见 api.version.request)(在 APIVERSION_QUERY 状态 2 毫秒后)%7|1637327168.580|FEATURE|rdkafka#consumer-1| [thrd:my-external-host:9092/bootstrap]:my-external-host:9092/bootstrap:将启用的协议功能 -ApiVersion 更新为 %7|1637327168.580|STATE|rdkafka#consumer-1| [thrd:我的外部主机:9092/bootstrap]:我的外部主机:9092/bootstrap:

0 投票
1 回答
166 浏览

apache-kafka - Kafka消费者:如何检查主题分区中的所有消息是否全部被消费完?

是否有任何 API 或属性可用于或比较以确定一个主题分区中的所有消息是否都已被消费?我们正在进行一项测试,该测试将使用同一消费者组中的另一个消费者来检查主题分区是否仍有任何消息。我们的一项应用服务也使用 Kafka 来处理内部事件。那么有没有办法同步消息消费的进度呢?