我目前正在使用 Confluent kafka python 客户端来使用来自 kafka 主题的消息,并且代码在while True
循环内运行良好,如文档中的示例所示。但是,我想设置一个每天只从主题中消耗一次的 cron 作业。这个想法是作业将在早上检查主题,在那个时间点消费主题中的所有消息然后停止。我尝试在 python 中实现这一点,如下所示:
msg = kafka_consumer.consume()
while msg:
msg_val = msg.value().decode('utf-8')
// do something with msg
msg = kafka_consumer.consume()
问题在于它永远不会消耗任何东西。我猜第一行在第一次尝试时永远不会收到消息。它仅适用,while True
但我不希望此代码无限运行,直到该时间点的最后一条消息被消耗。