问题标签 [confluent-kafka-python]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - 如何使用 kafka-python 检测分区结束?
我想尝试使用kafka-python包而不是 Confluent Kafka python 绑定,因为我想要一个纯 Python 包,而只有kafka-python可以做到这一点。但是,我无法弄清楚如何在 Confluent Kafka 中实现我所依赖的特定行为。
在 Confluent Kafka python 绑定中,可以将消费者配置为在到达分区末尾时返回错误消息。我用它来检测何时没有更多消息要处理,至少在一段时间内。但是,我无法找到正确的方法来使用kafka-python做同样的事情。
在开始处理消息之前,我可以向KakfaConsumer请求结束偏移量,但这会引入竞争条件;在我查询它之后以及我正在处理消息时,结束偏移量可能会移动。或者,我可以在处理每条消息后请求结束偏移量,但我担心这种方法可能会降低性能。
所以,虽然我有很多关于我能做什么的想法,但我不知道我应该做什么。
欢迎提供有关已接受方法的任何线索。
python - 如何对kafka生产者进行单元测试(python)
我正在使用 confluent-kafka 客户端在 kafka 集群中发送和接收消息。我有一个需要单元测试的生产者应用程序。我不想为此目的启动 Zookeeper 和 Kafka 服务器。有没有更简单的方法来使用 pytest 对其进行测试?
amazon-web-services - AWS lambda 到 Confluent Cloud 延迟问题
我目前在 Confluent 云上使用基本版本的集群,我只有一个主题有 9 个分区。我有一个使用 AWS lambda 服务设置的 REST Api,该服务将消息发布到 Kafka。目前我正在以每秒 5k-10k 的请求进行压力测试管道,我发现延迟达到 20-30 秒才能发布大小为 1kb 的记录。单个请求通常为 300 毫秒。我将 linger.ms - 500 ms 和 batch.size 等生产者配置添加到 100kb。我看到了一些改进(每个请求 15-20 秒),但我觉得它仍然太高了。有什么我遗漏的东西,还是融合云上的基本集群有什么东西?集群上的所有配置都是默认的。
apache-kafka - KSQL 无法加入加入的流,ksql 中的菊花链
我们正在使用 ksqldb 0.18 并尝试从这些主题的派生流中以菊花链方式连接一些主题。Afaik 派生主题中的底层消息可以使用 ksql 进行连接和查询,就像我们自己创建主题一样
但是,当我尝试从中创建一个新主题时,内部连接会静默失败,最后我会收到一部分消息。
这是预期的行为吗?
TLDR : (A+B) = Z; Z+C 的输出与 A+B+C 不同
python - 何时在融合的 kafka 客户端中调用传递回调并出现错误?
Confluent Kafka 库(在本例中为 python 版本)有一个 producer 方法,该方法采用传递回调函数:
无论消息是否成功传递,都会调用此回调:
我想知道当这个传递回调被错误调用时的场景是什么?我试图关闭 kafka 代理服务器,然后生成并刷新消息。仍然没有调用回调并出现错误,而是下次我在运行 Kafka 代理的情况下刷新时,前一条消息的成功传递回调首先出现。
那么我怎样才能有错误回调呢?
python - 我们如何在类构造函数中直接使用 json.dumps,而不是通过单独的函数调用它?
我们如何json.dumps
在构造函数中直接使用,而不是通过单独的函数调用它?
当我使用 KafkaProducer 类的生产方法时,它适用于上述实现。由于 json_serialize 函数仅用于json.dumps(obj)
.
json.dumps
如果我使用如下所示的直接输入尝试此操作KafkaProducer
,然后调用产生类方法,它将不起作用。给出错误为:
python-3.x - 只读 kafka 主题中的新消息
我正在用python中的confluent-kafka创建一个消费者,我想以这样一种方式创建它,如果消费者重新启动,它会从主题中的最后一条可用消息(每个分区)开始,它是否无关紧要在没有提交的情况下留下消息。
这是为了避免处理数以百万计的消息,这些消息是在消费者关闭时生成的,并且不再需要处理。
我尝试设置参数auto.offset.reset的不同选项,但最多从最后提交的偏移量开始。这是我的配置:
有没有办法实现这种行为?
注意:我可能有多个消费者,但没有一个手动分配给特定分区
python - 浮士德:TypeError:produce()得到了一个意外的关键字参数'timestamp'
在尝试重现浮士德文档和 Kafka 中显示的示例时,我得到了以下堆栈:
错误显示:self._quick_produce( TypeError: produce() got an unexpected keyword argument 'timestamp'
工人的代码用户是:
对于发件人,代码是:
我尝试使用其他传输编解码器,在记录中添加时间戳,但它不起作用
python - Apache Kafka Kerberos 使用 Python 进行身份验证
我需要开发一个 Python 程序来充当 Kafka 消费者并在此基础上进行一些处理。我使用了kafka-python,它在本地测试中做得很好。
但是,我的生产环境 (RHEL7) 需要 Kerberos 身份验证。Keytab 文件将用于此目的。
我找不到使用 kafka-python 引用此 Keytab 文件的方法(例如:通过在 Java Kafka Consumer 中配置 Jaas 配置文件)。
我找到了confluent-kafka-python,它似乎支持通过 Keytab 文件进行 Kerberos 身份验证。
另外,我读到如果 Java 进程处理 Kerberos 身份验证,可以使用 kafka-python。
哪种方法会是更好的解决方案?
python-3.x - 如何在使用 kafka-python 消费之前获取 kafka 主题的未消费消息计数?
我正在尝试将 Kafka 消息发布到 API,但仅当主题有新的(未使用的)消息时,我需要知道未使用的消息的数量。有什么方法可以在 Kafka-python 中做到这一点吗?