0

我正在运行代码,它只消耗该主题的 1 条最新记录。我想要该主题的所有记录。我存储记录的输出文件也有记录的元数据。如何跳过元数据。

from confluent_kafka import Consumer, KafkaError
import socket
import os
os.environ['KRB5_CONFIG'] = 'etc/krb5.conf'
os.environ[KRB5_TRACE] = '/dev/stderr'
SETTING = {
'bootstrap.servers': '****',
'client.id': socket.gethostname(),
'session.timeout.ms': 30000,
'debug': 'security,broker,protocal,all',
'default.topic.config': 
'security_protocol': 'SASL_PLAINTEXT',
'sasl.kerberos.service.name': 'kafka',
'sasl.kerberos.service.princiapl': '****',
'sasl.mechanism': 'GSSAPI',
'sasl.kerberos.keytab': '****',
'sasl.kerberos.kinit.cmd': 'kinit -R -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal} || kinit -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal}',
'enable.auto.commit: False
}
c = Consumer(SETTING)
c.subscribe(['topic'])
while True:
    msg = c.poll(1.0)
    if msg in None:
        continue
    if msg.error():
        print("Consumer error: ()".format(msg.error()))
        continue
    with open("file.txt","w") as file:
        file.write(msg.value().decode('utf-8'))
 c.close()
4

1 回答 1

0

set this flag 'auto.offset.reset': 'smallest' in SETTING, this will allow you to read all the messages from the beginning.

于 2021-04-30T11:27:45.027 回答