我在使用官方 Confluent Kafka Python API 的基本用法时遇到错误:
我订阅:
kafka_consumer.subscribe(topics=["my-avro-topic"], on_assign=on_assign_callback, on_revoke=on_revoke_callback)
使用回调:
def on_assign_callback(consumer, topic_partitions):
for topic_partition in topic_partitions:
print("without position. topic={}. partition={}. offset={}. error={}".format(topic_partition.topic, topic_partition.partition,
topic_partition.offset, topic_partition.error))
topic_partitions_with_offsets = consumer.position(topic_partitions)
print("assigned to {}->{} partitions".format(len(topic_partitions), len(topic_partitions_with_offsets)))
for topic_partition in topic_partitions_with_offsets:
print("with position. topic={}. partition={}. offset={}. error={}".format(topic_partition.topic, topic_partition.partition,
topic_partition.offset, topic_partition.error))
产生控制台输出:
without position. topic=my-avro-topic. partition=0. offset=-1001. error=None
assigned to 1->1 partitions
with position. topic=my-avro-topic. partition=0. offset=-1001. error=KafkaError{code=_UNKNOWN_PARTITION,val=-190,str="(null)"}
有人可以解释一下吗?为什么我会在未知分区上收到回调通知?类似的代码使用 Java API 可以完美运行。