0

我目前正在使用 Confluent kafka python 客户端来使用来自 kafka 主题的消息,并且代码在while True循环内运行良好,如文档中的示例所示。但是,我想设置一个每天只从主题中消耗一次的 cron 作业。这个想法是作业将在早上检查主题,在那个时间点消费主题中的所有消息然后停止。我尝试在 python 中实现这一点,如下所示:

msg = kafka_consumer.consume()
while msg:
  msg_val = msg.value().decode('utf-8')
  // do something with msg
  msg = kafka_consumer.consume()

问题在于它永远不会消耗任何东西。我猜第一行在第一次尝试时永远不会收到消息。它仅适用,while True但我不希望此代码无限运行,直到该时间点的最后一条消息被消耗。

4

1 回答 1

0

您可以检查循环内消费者组的偏移量,然后在“结束”的某个阈值内中断循环

您可能还想尝试使用max.poll.records消费者配置,以更好地控制您返回的记录数量

于 2020-08-25T17:48:31.917 回答