我正在尝试通过在 0 0 * * * 每天使用 confluent_kafka Python 包来进行批处理 etl。我知道我的流中有 4 个分区,但它可以更改,所以有没有办法检查特定主题中的分区总数?我的消费者就是这样;
from confluent_kafka import Consumer, KafkaError
messages = list()
partition_counter = 0
tnof_partition = 4
while True:
msg = self.consumer.poll(0.1)
if msg is None:
continue
elif not msg.error():
event = json.loads(msg.value().decode('utf-8'))
elif msg.error().code() == KafkaError._PARTITION_EOF:
print("End of partition reached {0}/{1}"
.format(msg.topic(), msg.partition()))
partition_counter += 1
if(partition_counter == tnof_partition):
self.consumer.commit()
self.consumer.close()
break
如果您能展示实现批量消费者的替代方法,我也将不胜感激。谢谢