8

我正在使用 Kafka 0.8.1 和 Kafka python-0.9.0。在我的设置中,我设置了 2 个 kafka 代理。当我运行我的 kafka 消费者时,我可以看到它从队列中检索消息并跟踪两个代理的偏移量。一切都很好!

我的问题是,当我重新启动消费者时,它会从头开始消费消息。我所期待的是,在重新启动时,消费者会从它死前停止的地方开始消费消息。

我确实尝试跟踪 Redis 中的消息偏移量,然后在从队列中读取消息之前调用 consumer.seek 以确保我只收到以前未见过的消息。虽然这行得通,但在部署此解决方案之前,我想与大家核实一下……也许我对 Kafka 或 python-Kafka 客户端有一些误解。似乎消费者能够从中断的地方重新开始阅读是非常基本的功能。

谢谢!

4

5 回答 5

6

注意 kafka-python 库。它有一些小问题。

如果速度对您的消费者来说不是真正的问题,您可以在每条消息中设置自动提交。它应该有效。

SimpleConsumer 提供了一种seek方法(https://github.com/mumrah/kafka-python/blob/master/kafka/consumer/simple.py#L174-L185),允许您在任何时候开始使用消息。

最常见的调用是:

  • consumer.seek(0, 0)从队列的开头开始读取。
  • consumer.seek(0, 1)从当前偏移量开始读取。
  • consumer.seek(0, 2)跳过所有未决消息并开始仅阅读新消息。

第一个参数是这些位置的偏移量。这样,如果您打电话,consumer.seek(5, 0)您将跳过队列中的前 5 条消息。

另外,不要忘记,偏移量是为消费者组存储的。确保您一直使用同一个。

于 2015-03-23T07:11:01.993 回答
3

kafka-python 使用 kafka 服务器存储偏移量,而不是在单独的 zookeeper 连接上。不幸的是,支持提交/获取偏移量的 kafka 服务器 api 直到 apache kafka 0.8.1.1 才完全起作用。如果你升级你的 kafka 服务器,你的设置应该可以工作。我还建议将 kafka-python 升级到 0.9.4。

[kafka-python 维护者]

于 2014-12-12T04:37:23.877 回答
1

Kafka 消费者能够在 Zookeeper 中存储偏移量。在 Java API 中,我们有两个选项- 高级消费者,它为我们管理状态并在重新启动后从离开的地方开始消费,以及没有这种超能力的无状态低级消费者。

根据我对 Python 的消费者代码(https://github.com/mumrah/kafka-python/blob/master/kafka/consumer.py)的理解,SimpleConsumerMultiProcessConsumer都是有状态的,并且会跟踪 Zookeeper 中的当前偏移量,所以奇怪的是你有这个重复消耗的问题。

确保您在重新启动时具有相同的消费者组 ID(可能是您将其设置为随机的?)并检查以下选项:

auto_commit: default True. Whether or not to auto commit the offsets
auto_commit_every_n: default 100. How many messages to consume
                     before a commit
auto_commit_every_t: default 5000. How much time (in milliseconds) to
                     wait before commit

您可能会消耗 < 100 条消息还是 < 5000 毫秒?

于 2014-07-14T05:12:19.163 回答
1

您只需要确保您的 Kafka Consumer 从最新的偏移量 ( auto.offset.reset="latest") 开始读取。还要确保您定义了一个消费者组,以便可以提交偏移量,并且当消费者出现故障时可以选择其最后提交的位置。


使用confluent-kafka-python

from confluent_kafka import Consumer


c = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mygroup',
    'auto.offset.reset': 'latest'
})

c.subscribe(['my_topic'])

使用kafka-python

from kafka import KafkaConsumer


consumer = KafkaConsumer(
    'my_topic', 
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='latest', 
    enable_auto_commit=True,
    group_id='mygroup'
)
于 2020-04-28T15:11:20.667 回答
1

首先,您需要设置一个 group_id,记录偏移量,以便它将继续使用来自 this 的消息group_id

如果您已经消费了组中所有现有的消息,那么您想再次重新消费这些消息。你可以使用seek来实现这一点。

这是一个例子:

def test_consume_from_offset(offset):
    topic = 'test'
    consumer = KafkaConsumer(bootstrap_servers=broker_list, group_id='test')
    tp = TopicPartition(topic=topic, partition=0)
    consumer.assign([tp])
    consumer.seek(tp, offset)   # you can set the offset you want to resume from.
    for msg in consumer:
        # the msg begins with the offset you set
        print(msg)

test_consume_from_offset(10)
于 2019-10-29T05:30:32.823 回答