这是我到目前为止所尝试的:
from confleunt_kafka import Consumer
c = Consumer({... several security/server settings skipped...
'auto.offset.reset': 'beginning',
'group.id': 'my-group'})
c.subscribe(['my.topic'])
msg = poll(30.0) # msg is of None type.
msg
几乎总是最终成为None
虽然。我认为问题可能是'my-group'
已经消费了所有消息'my.topic'
......但我不在乎消息是否已经被消费 - 我仍然需要最新消息。具体来说,我需要最新消息中的时间戳。
我尝试了更多,从这里看起来主题中可能有 25 条消息,但我不知道如何获取它们:
a = c.assignment()
print(a) # Outputs [TopicPartition{topic=my.topic,partition=0,offset=-1001,error=None}]
offsets = c.get_watermark_offsets(a[0])
print(offsets) # Outputs: (25, 25)
如果没有消息,因为该主题根本没有写入任何内容,我该如何确定?如果是这种情况,我如何确定该主题存在多长时间?我正在寻找一个脚本,该脚本会自动删除过去 X 天内未写入的任何主题(最初为 14 天 - 可能会随着时间的推移对其进行调整。)