1

我想尝试使用kafka-python包而不是 Confluent Kafka python 绑定,因为我想要一个纯 Python 包,而只有kafka-python可以做到这一点。但是,我无法弄清楚如何在 Confluent Kafka 中实现我所依赖的特定行为。

在 Confluent Kafka python 绑定中,可以将消费者配置为在到达分区末尾时返回错误消息。我用它来检测何时没有更多消息要处理,至少在一段时间内。但是,我无法找到正确的方法来使用kafka-python做同样的事情。

在开始处理消息之前,我可以向KakfaConsumer请求结束偏移量,但这会引入竞争条件;在我查询它之后以及我正在处理消息时,结束偏移量可能会移动。或者,我可以在处理每条消息后请求结束偏移量,但我担心这种方法可能会降低性能。

所以,虽然我有很多关于我能做什么的想法但我不知道我该做什么。

欢迎提供有关已接受方法的任何线索。

4

0 回答 0