0

我在使用官方 Confluent Kafka Python API 的基本用法时遇到错误:

我订阅:

kafka_consumer.subscribe(topics=["my-avro-topic"], on_assign=on_assign_callback, on_revoke=on_revoke_callback)

使用回调:

def on_assign_callback(consumer, topic_partitions):
for topic_partition in topic_partitions:
    print("without position. topic={}. partition={}. offset={}. error={}".format(topic_partition.topic, topic_partition.partition,
                                   topic_partition.offset, topic_partition.error))

topic_partitions_with_offsets = consumer.position(topic_partitions)
print("assigned to {}->{} partitions".format(len(topic_partitions), len(topic_partitions_with_offsets)))

for topic_partition in topic_partitions_with_offsets:
    print("with position. topic={}. partition={}. offset={}. error={}".format(topic_partition.topic, topic_partition.partition,
                                   topic_partition.offset, topic_partition.error))

产生控制台输出:

without position. topic=my-avro-topic. partition=0. offset=-1001. error=None
assigned to 1->1 partitions
with position. topic=my-avro-topic. partition=0. offset=-1001. error=KafkaError{code=_UNKNOWN_PARTITION,val=-190,str="(null)"}

有人可以解释一下吗?为什么我会在未知分区上收到回调通知?类似的代码使用 Java API 可以完美运行。

4

1 回答 1

5

这是底层 C 库 librdkafka 中的一个错误。请参阅上游问题

如果您想从存储的偏移量开始消费,您实际上不需要调用 position() 来检索它们,如果您不更改默认偏移量 -1001,客户端将自动执行此操作。

于 2016-06-15T09:17:39.540 回答