12

我无法KafaConsumer让它从头开始读取,或者从任何其他显式偏移量读取。

为同一主题的消费者运行命令行工具,我确实看到带有该--from-beginning选项的消息,否则它会挂起

$ ./kafka-console-consumer.sh --zookeeper {localhost:port} --topic {topic_name} --from-beginning

如果我通过 python 运行它,它会挂起,我怀疑这是由不正确的消费者配置引起的

consumer = KafkaConsumer(topic_name,
                     bootstrap_servers=['localhost:9092'],
                     group_id=None,
                     auto_commit_enable=False,
                     auto_offset_reset='smallest')

print "Consuming messages from the given topic"
for message in consumer:
    print "Message", message
    if message is not None:
        print message.offset, message.value

print "Quit"

输出:

使用来自给定主题的消息(之后挂起)

我使用的是 kafka-python 0.9.5,代理运行的是 kafka 8.2。不确定确切的问题是什么。

按照 dpkp 的建议设置 _group_id=None_ 以模拟控制台使用者的行为。

4

6 回答 6

10

控制台消费者和您发布的python消费者代码之间的区别在于python消费者使用消费者组来保存偏移量:group_id="test-consumer-group". 相反,如果您设置 group_id=None,您应该会看到与控制台使用者相同的行为。

于 2016-03-15T04:33:49.610 回答
4

auto_offset_reset='earliest' 为我解决了这个问题。

于 2019-05-16T08:48:18.263 回答
4

auto_offset_reset='earliest'group_id=None为我解决了。

于 2019-12-04T09:13:19.053 回答
3

我遇到了同样的问题:我可以在 kafka 控制台中接收,但无法使用 python 脚本获取消息 package kafka-python

最后我认为原因是我没有打电话producer.flush(),并且producer.close()在我的文档producer.py中没有提到。

于 2020-11-25T11:33:00.320 回答
0

我之前遇到过同样的问题,所以我在运行代码的机器上本地运行 kafka-topics 进行测试,我得到了 UnknownHostException。我在文件中添加了 IP 和主机名,hosts它在 kafka-topics 和代码中都运行良好。似乎KafkaConsumer试图获取消息但没有引发任何异常就失败了。

于 2021-05-24T14:29:13.867 回答
0

我的看法是:打印并确保偏移量是您所期望的。通过使用position()and seek_to_beginning(),请查看代码中的注释。

我无法解释:

  1. 为什么实例化后KafkaConsumer没有分配分区,这是设计使然吗?Hack around 是调用poll()一次之前seek_to_beginning()
  2. 为什么有时在之后seek_to_beginning(),第一次调用不poll()返回数据并且不更改偏移量。

代码:

import kafka
print(kafka.__version__)
from kafka import KafkaProducer, KafkaConsumer
from time import sleep
KAFKA_URL = 'localhost:9092' # kafka broker
KAFKA_TOPIC = 'sida3_sdtest_topic' # topic name

# ASSUMING THAT the topic exist

# write to the topic
producer = KafkaProducer(bootstrap_servers=[KAFKA_URL])
for i in range(20):
    producer.send(KAFKA_TOPIC, ('msg' + str(i)).encode() )
producer.flush()

# read from the topic
# auto_offset_reset='earliest', # auto_offset_reset is needed when offset is not found, it's NOT what we need here
consumer = KafkaConsumer(KAFKA_TOPIC,
bootstrap_servers=[KAFKA_URL],
max_poll_records=2,
group_id='sida3'
)

# (!?) wtf, why we need this to get partitions assigned
# AssertionError: No partitions are currently assigned if poll() is not called
consumer.poll()
consumer.seek_to_beginning()

# also AssertionError: No partitions are currently assigned if poll() is not called
print('partitions of the topic: ',consumer.partitions_for_topic(KAFKA_TOPIC))

from kafka import TopicPartition
print('before poll() x2: ')
print(consumer.position(TopicPartition(KAFKA_TOPIC, 0)))
print(consumer.position(TopicPartition(KAFKA_TOPIC, 1)))

# (!?) sometimes the first call to poll() returns nothing and doesnt change the offset
messages = consumer.poll()
sleep(1)
messages = consumer.poll()

print('after poll() x2: ')
print(consumer.position(TopicPartition(KAFKA_TOPIC, 0)))
print(consumer.position(TopicPartition(KAFKA_TOPIC, 1)))

print('messages: ', messages)

输出

2.0.1
partitions of the topic:  {0, 1}
before poll() x2: 
0
0
after poll() x2: 
0
2
messages:  {TopicPartition(topic='sida3_sdtest_topic', partition=1): [ConsumerRecord(topic='sida3_sdtest_topic', partition=1, offset=0, timestamp=1600335075864, timestamp_type=0, key=None, value=b'msg0', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=4, serialized_header_size=-1), ConsumerRecord(topic='sida3_sdtest_topic', partition=1, offset=1, timestamp=1600335075864, timestamp_type=0, key=None, value=b'msg1', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=4, serialized_header_size=-1)]}
于 2020-09-17T09:52:39.737 回答