我正在运行代码,它只消耗该主题的 1 条最新记录。我想要该主题的所有记录。我存储记录的输出文件也有记录的元数据。如何跳过元数据。
from confluent_kafka import Consumer, KafkaError
import socket
import os
os.environ['KRB5_CONFIG'] = 'etc/krb5.conf'
os.environ[KRB5_TRACE] = '/dev/stderr'
SETTING = {
'bootstrap.servers': '****',
'client.id': socket.gethostname(),
'session.timeout.ms': 30000,
'debug': 'security,broker,protocal,all',
'default.topic.config':
'security_protocol': 'SASL_PLAINTEXT',
'sasl.kerberos.service.name': 'kafka',
'sasl.kerberos.service.princiapl': '****',
'sasl.mechanism': 'GSSAPI',
'sasl.kerberos.keytab': '****',
'sasl.kerberos.kinit.cmd': 'kinit -R -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal} || kinit -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal}',
'enable.auto.commit: False
}
c = Consumer(SETTING)
c.subscribe(['topic'])
while True:
msg = c.poll(1.0)
if msg in None:
continue
if msg.error():
print("Consumer error: ()".format(msg.error()))
continue
with open("file.txt","w") as file:
file.write(msg.value().decode('utf-8'))
c.close()