10

使用 kafka-python-1.0.2。

如果我有一个包含 10 个分区的主题,我如何去提交一个特定的分区,同时遍历各种分区和消息。我似乎无法在任何地方找到这样的例子,在文档或其他地方

从文档中,我想使用:

consumer.commit(offset=offsets)

具体来说,如何创建偏移量所需的分区和 OffsetAndMetadata 字典(dict,可选) - {TopicPartition: OffsetAndMetadata}。

我希望函数调用会是这样的:

consumer.commit(partition, offset)

但这似乎并非如此。

提前致谢。

4

5 回答 5

10

所以看起来我可能已经想通了,有趣的是,当你写下你的问题时会发生这种情况。这似乎有效:

meta = consumer.partitions_for_topic(topic)
options = {}
options[partition] = OffsetAndMetadata(message.offset + 1, meta)
consumer.commit(options)

需要更多测试,但如果有任何变化会更新。

于 2016-04-12T17:43:02.353 回答
7

没有必要使用元数据。看这个例子:

from kafka import TopicPartition
from kafka.structs import OffsetAndMetadata
...
topic = 'your_topic'
partition = 0
tp = TopicPartition(topic,partition)
kafkaConsumer = createKafkaConsumer()
kafkaConsumer.assign([tp])
offset = 15394125
kafkaConsumer.commit({
    tp: OffsetAndMetadata(offset, None)
})

希望这可以帮助。

于 2016-10-14T12:00:18.973 回答
4
from kafka import KafkaConsumer
from kafka import TopicPartition

TOPIC = "test_topic"
PARTITION = 0

consumer = KafkaConsumer(
    group_id=TOPIC,
    auto_offset_reset="earliest",
    bootstrap_servers="localhost:9092",
    request_timeout_ms=100000,
    session_timeout_ms=99000,
    max_poll_records=100,
)
topic_partition = TopicPartition(TOPIC, PARTITION)
# format: topic, partition
consumer.assign([topic_partition])
consumer.seek(topic_partition, 1660000)
# format: TopicPartition, offset. 1660000 is the offset been set.
for message in consumer:
    # do something
  1. 这只会分配一个分区并为该分区设置偏移量,如果有多个分区,则需要为每个分区分配一个,然后设置偏移量。
  2. aalmeida88 的答案有时对我有用,在某些情况下,它确实有效,并且 aalmeida88 给了我寻找的想法,它似乎也是一种有用的方法。
  3. 另一个你可能需要注意的是,当你自己分配分区时,kafka manager 似乎无法获取消费者信息,这可能是因为你在分配分区时将它设置在 kafka 而不是 zookeeper 中,所以 kafka manager 可能没有得到那个信息。希望能帮助到你!

- -编辑 - - -

找到更好的方法来做到这一点。

topic_partition = TopicPartition(TOPIC,
                                 message.partition)
consumer.seek(topic_partition, offset_value)
consumer.commit()

这将从kafka获取的消息中提取分区信息并保存子句以手动分配分区,从而在程序中需要设置多个分区的偏移量(并不罕见)时带来方便。

ps:为了保证一个分区只设置一次,需要根据你的应用设置一个flag。

于 2017-05-18T01:29:20.567 回答
2

Just need to call consumer.commit()

from kafka import KafkaConsumer

KAFKA_TOPIC_NAME='KAFKA_TOPIC_NAME'
KAFKA_CONSUMER_GROUP='KAFKA_CONSUMER_GROUP'
consumer = KafkaConsumer(
    KAFKA_TOPIC_NAME,
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    group_id=KAFKA_CONSUMER_GROUP
)
for message in consumer:
    print(message.value)
    consumer.commit()    # <--- This is what we need
    # Optionally, To check if everything went good
    from kafka import TopicPartition
    print('New Kafka offset: %s' % consumer.committed(TopicPartition(KAFKA_TOPIC_NAME, message.partition)))
于 2019-07-09T01:38:54.290 回答
0
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
        
consumer = KafkaConsumer("topic_name", enable_auto_commit= False, bootstrap_servers=["128.0.0.1:9092"],group_id= "group_name")
msg = next(consumer)
consumer.commit({TopicPartition("topic_name", msg.partition): OffsetAndMetadata(msg.offset+1, '')})
于 2021-03-25T05:30:10.653 回答