问题标签 [confluent-kafka-python]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
720 浏览

python - 从 python-kafka 转换为 confluent kafka - 如何使用 SASL_SSL、OAUTHBEARER 和令牌创建奇偶校验

我有一个可以工作的python kafka,它是代码:

我的融合 python 如下,我收到此错误

那么如何让融合的 kafka 工作呢?我似乎对使用 OAUTHBEARER 和 SASL_SSL 的 oauthbearer_token_refresh_cb 有问题。

本质上,我使用 jwt 令牌进行身份验证

0 投票
0 回答
1101 浏览

python-3.x - AWS MSK 设置通告的侦听器

我正在尝试设置我的 MSK 集群的 Advertisementd.listeners 配置。但是,我收到以下错误:

要求失败:inter.broker.listener.name 必须是在 Advertisementd.listeners 中定义的侦听器名称

我不确定这个错误是什么意思。我已经尝试搜索它,但什么也没有出现。我有所有与 VPC 相关的配置设置。我能够从集群中读取主题名称和其他配置。

我正在使用 python 的 confluent_kafka 模块的管理客户端。

这是代码:

0 投票
1 回答
201 浏览

apache-kafka - 使用 confluent-kafka-python 设置主题模式

我是 confluent-kafka-python 的新手,我正在使用cp-all-in-one 中的docker-compose 运行 kafka 。我创建了一个生产者,为一个与官方教程相同的新主题生成数据。

但我找不到有关如何在代码中设置架构或与数据库的连接的任何信息。

我正在尝试将生成的数据保存到数据库中的表中,以便稍后我可以将其加载到 UI 上。现在每次我刷新 Confluent Control Center 主题页面时,似乎我都会丢失数据。有这方面的教程吗?还是我错误地使用了 Kafka。

0 投票
0 回答
765 浏览

python-3.x - 一些 Python Confluent Kafka Consumer 挂起而无法正常工作

我有 5 个消费者用于 avro 主题。对于每个消费者,它group.id是不同的,它是随机生成的。所以没有共享group.id。我注意到,无论我重启多少次,消费者都无法连接到代理,有时甚至更多。他们只是挂起。

所以我开始对分配给group.id有问题的消费者的名称进行一些更改,我发现相同的值使消费者工作,而其他一些则没有。

首先,这就是我正在使用的

这是消费者

例如,如果我将值分配给services_registry_consumer_test_01group.id,一切都按预期工作,我得到了

如果我将值分配给services_registry_consumer_test_02key group.id,消费者,以前工作的同一个,现在挂起而没有设法订阅给定的主题。

如果我重新分配值services_registry_consumer_test_01,一切都会按预期开始工作。我知道这听起来可能很奇怪,但事实就是如此,而且我已经睡了好几天了。

我不知道我是否在这里做错了什么,我非常感谢您对此的看法。

0 投票
1 回答
341 浏览

apache-kafka - 用于嵌套记录的 Confluent Kafka 生产者消息格式

我在 kafka 主题中注册了一个 AVRO 模式,并试图向它发送数据。该架构具有嵌套记录,我不确定如何使用 confluent_kafka python 正确地将数据发送给它。

示例架构: *ingore 架构中的任何拼写错误(真实的非常大,只是一个示例)

我正在尝试使用 confluent_kafka python 版本将数据发送到此模式。当我之前完成此操作时,记录没有嵌套,我将使用典型的字典key: value对并将其序列化。如何发送嵌套数据以使用模式。

到目前为止我尝试过的...

此版本不会导致任何 kafka 错误,但所有列都显示为 null

和...

此版本在架构中产生了 kafka 错误。

0 投票
1 回答
690 浏览

confluent-platform - 如何连接 Kafka python 以接受 jaas 的用户名和密码,就像在 Java 中完成的一样?

使用现有的工作 Java 示例,我正在尝试使用 python-kafka 和 confluent_kafka 库编写与生产者等效的 python。如何使用以下 Java 中的信息在 python 中配置 sasl.jass.config?

0 投票
1 回答
246 浏览

python - 如何在kafka消费者中读取和处理高优先级消息?

有什么方法可以优先处理高优先级的消息吗?

我尝试创建三个主题“高”、“中”和“低”,并使用一个消费者订阅所有三个主题,如果“高”主题中有未处理的消息,它将暂停另外两个。有没有更好的方法来实现消息优先级?

我尝试使用下面给出的逻辑。

0 投票
0 回答
460 浏览

python - 使用 python 多线程的 Kafka 生产者

我正计划构建一个关键组件,用于向 kafka 生成消息。我只是在想,python 多线程有什么方法可以帮助我们用 asyncio、多线程编写高效的 kafka 生产者?

另外在这里我计划按需创建一个线程,用例就像kafka生产者脚本需要消耗文本文件并产生它kafka(在这里抓住,这需要按需完成)。设计就像,kafka 生产者脚本将从 redis/rabbitmq 队列中读取按需请求,一旦我们收到请求,我计划为每个请求创建一个线程。请求包含要读取和发送的文本文件。

是否可以使用多线程和 asyncio 在 python 中实现?对此的任何帮助表示赞赏。谢谢

0 投票
1 回答
2094 浏览

python - 使用 Python 获取 Confluent Kafka 主题的最新消息

这是我到目前为止所尝试的:

msg几乎总是最终成为None虽然。我认为问题可能是'my-group'已经消费了所有消息'my.topic'......但我不在乎消息是否已经被消费 - 我仍然需要最新消息。具体来说,我需要最新消息中的时间戳。

我尝试了更多,从这里看起来主题中可能有 25 条消息,但我不知道如何获取它们:

如果没有消息,因为该主题根本没有写入任何内容,我该如何确定?如果是这种情况,我如何确定该主题存在多长时间?我正在寻找一个脚本,该脚本会自动删除过去 X 天内未写入的任何主题(最初为 14 天 - 可能会随着时间的推移对其进行调整。)

0 投票
0 回答
506 浏览

amazon-web-services - Kafka: Understanding Broker failure

I have a Kafka cluster with:

  • 2 brokers b-1 and b-2.
  • 2 topics with both: PartitionCount:1 ReplicationFactor:2 min.insync.replicas=1

Here is what happened:

Within the code, I got this error when my producer performed a poll around that time:

Broker b-2 logs have this:

My understanding here is that (1) b-2 went down i.e. unable to connect to Zookeeper (2) Messages were produced to b-1 successfully during this time. (3) b-1 was also trying to forward messages to b-2during this downtime due to the replication factor set to 2 (4) All these forwarded messages (ProduceRequests) got timed-out after 600s

My question:

  1. Is my understanding correct and how I can prevent this from happening again?
  2. If I had 3 brokers here, would b-1 have tried to connect to b-3 right away rather than waiting for b-2? Is that a good workaround? (Assuming topic replication factor = 2 everywhere)