我有一个 confluent-kafka 消费者,它将每天运行一次。在它运行时,我只想从该主题中获取最后一条消息。我有以下代码,但没有获取最后一条消息,只是读取最后一条消息以外的消息。我的 confluent-kafka 消费者代码如下所示:
cfg = {
'bootstrap.servers': host,
'group.id': groupName,
'api.version.request': False,
'enable.auto.commit': True,
'session.timeout.ms': 6000,
'default.topic.config': {'auto.offset.reset': 'largest'},
'security.protocol': 'SSL',
'ssl.key.password': 'pswd',
'ssl.ca.location': certPath,
}
C = Consumer(cfg)
C.assign([TopicPartition(topicName, 1)])
C.subscribe([topicName])
msgList=[]
while True:
msg = C.poll(6.0)
if msg:
jsonmsg= json.loads(msg.value())
if jsonmsg[expectedValue]==eventId:
msgList.append(eventId)
break
其中 eventId 是最后发布的消息上的值。我需要包含任何参数还是我在配置中遗漏了什么?任何帮助,将不胜感激。