1

我有一个 confluent-kafka 消费者,它将每天运行一次。在它运行时,我只想从该主题中获取最后一条消息。我有以下代码,但没有获取最后一条消息,只是读取最后一条消息以外的消息。我的 confluent-kafka 消费者代码如下所示:

cfg = {
            'bootstrap.servers': host,
            'group.id': groupName,
            'api.version.request': False,
            'enable.auto.commit': True,
            'session.timeout.ms': 6000,
            'default.topic.config': {'auto.offset.reset': 'largest'},
            'security.protocol': 'SSL',
            'ssl.key.password': 'pswd',
            'ssl.ca.location': certPath,
        }
        C = Consumer(cfg)
        C.assign([TopicPartition(topicName, 1)])
        C.subscribe([topicName])
        msgList=[]       
        while True:
            msg = C.poll(6.0)
            if msg:
               jsonmsg= json.loads(msg.value())
               if jsonmsg[expectedValue]==eventId:
                    msgList.append(eventId)
                    break

其中 eventId 是最后发布的消息上的值。我需要包含任何参数还是我在配置中遗漏了什么?任何帮助,将不胜感激。

4

0 回答 0