我正在使用 python 3.7 和 confluent-kafka。
以下是我用来轮询 kafka 服务器并阅读消息的伪代码。
while True:
MSG = CONSUMER.poll(0.1)
if MSG is None:
CONSUMER.commit()
print('No msg')
continue
if MSG.error():
print("Consumer error: {}".format(MSG.error()))
CONSUMER.commit()
continue
try:
rawMsg = format(MSG.value().decode('utf-8'))
testmsg = json.loads(rawMsg)
except:
print('invalid json format msg')
CONSUMER.commit()
如果 kafka 服务器由于某种原因关闭或断开连接,我希望抛出异常。
目前,如果发生上述情况,while 循环会继续运行而不会出现任何错误并打印No msg。
如何获得异常或检查每次循环中是否可以连接 kafka 服务器(如果要进行一些检查,它应该是轻量级的)。