1

这是我到目前为止所尝试的:

from confleunt_kafka import Consumer

c = Consumer({... several security/server settings skipped...
              'auto.offset.reset': 'beginning',
              'group.id': 'my-group'})

c.subscribe(['my.topic'])
msg = poll(30.0)  # msg is of None type.

msg几乎总是最终成为None虽然。我认为问题可能是'my-group'已经消费了所有消息'my.topic'......但我不在乎消息是否已经被消费 - 我仍然需要最新消息。具体来说,我需要最新消息中的时间戳。

我尝试了更多,从这里看起来主题中可能有 25 条消息,但我不知道如何获取它们:

a = c.assignment()
print(a)  # Outputs [TopicPartition{topic=my.topic,partition=0,offset=-1001,error=None}]
offsets = c.get_watermark_offsets(a[0])
print(offsets)  # Outputs: (25, 25)

如果没有消息,因为该主题根本没有写入任何内容,我该如何确定?如果是这种情况,我如何确定该主题存在多长时间?我正在寻找一个脚本,该脚本会自动删除过去 X 天内未写入的任何主题(最初为 14 天 - 可能会随着时间的推移对其进行调整。)

4

1 回答 1

2

我遇到了同样的问题,没有这方面的例子。在我的情况下,有一个分区,我需要阅读最后一条消息,以了解该消息中的一些信息以设置我拥有的消费者/生产者组件。

逻辑是开始Consumer、订阅主题、轮询消息 ->on_assign通过将修改后的分区分配回去,触发发生倒带的地方。完成后on_assign,轮询msg继续并从主题中读取最后一条消息。

settings = {
    "bootstrap.servers": "my.kafka.server",
    "group.id": "my-work-group",
    "client.id": "my-work-client-1",
    "enable.auto.commit": False,
    "session.timeout.ms": 6000,
    "default.topic.config": {"auto.offset.reset": "largest"},
}
consumer = Consumer(settings)

def on_assign(a_consumer, partitions):
    # get offset tuple from the first partition
    last_offset = a_consumer.get_watermark_offsets(partitions[0])
    # position [1] being the last index
    partitions[0].offset = last_offset[1] - 1
    consumer.assign(partitions)

consumer.subscribe(["test-topic"], on_assign=on_assign)

msg = consumer.poll(6.0)

现在msg里面有最后一条消息。

于 2021-04-09T11:15:48.340 回答