我的看法是:打印并确保偏移量是您所期望的。通过使用position()
and seek_to_beginning()
,请查看代码中的注释。
我无法解释:
- 为什么实例化后
KafkaConsumer
没有分配分区,这是设计使然吗?Hack around 是调用poll()
一次之前seek_to_beginning()
- 为什么有时在之后
seek_to_beginning()
,第一次调用不poll()
返回数据并且不更改偏移量。
代码:
import kafka
print(kafka.__version__)
from kafka import KafkaProducer, KafkaConsumer
from time import sleep
KAFKA_URL = 'localhost:9092' # kafka broker
KAFKA_TOPIC = 'sida3_sdtest_topic' # topic name
# ASSUMING THAT the topic exist
# write to the topic
producer = KafkaProducer(bootstrap_servers=[KAFKA_URL])
for i in range(20):
producer.send(KAFKA_TOPIC, ('msg' + str(i)).encode() )
producer.flush()
# read from the topic
# auto_offset_reset='earliest', # auto_offset_reset is needed when offset is not found, it's NOT what we need here
consumer = KafkaConsumer(KAFKA_TOPIC,
bootstrap_servers=[KAFKA_URL],
max_poll_records=2,
group_id='sida3'
)
# (!?) wtf, why we need this to get partitions assigned
# AssertionError: No partitions are currently assigned if poll() is not called
consumer.poll()
consumer.seek_to_beginning()
# also AssertionError: No partitions are currently assigned if poll() is not called
print('partitions of the topic: ',consumer.partitions_for_topic(KAFKA_TOPIC))
from kafka import TopicPartition
print('before poll() x2: ')
print(consumer.position(TopicPartition(KAFKA_TOPIC, 0)))
print(consumer.position(TopicPartition(KAFKA_TOPIC, 1)))
# (!?) sometimes the first call to poll() returns nothing and doesnt change the offset
messages = consumer.poll()
sleep(1)
messages = consumer.poll()
print('after poll() x2: ')
print(consumer.position(TopicPartition(KAFKA_TOPIC, 0)))
print(consumer.position(TopicPartition(KAFKA_TOPIC, 1)))
print('messages: ', messages)
输出:
2.0.1
partitions of the topic: {0, 1}
before poll() x2:
0
0
after poll() x2:
0
2
messages: {TopicPartition(topic='sida3_sdtest_topic', partition=1): [ConsumerRecord(topic='sida3_sdtest_topic', partition=1, offset=0, timestamp=1600335075864, timestamp_type=0, key=None, value=b'msg0', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=4, serialized_header_size=-1), ConsumerRecord(topic='sida3_sdtest_topic', partition=1, offset=1, timestamp=1600335075864, timestamp_type=0, key=None, value=b'msg1', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=4, serialized_header_size=-1)]}