我使用了一个在 python asyncio 上使用 NATS 的几乎标准示例。我想接收一条消息,对其进行处理,然后将结果发送回队列,但是当 NATS 断开连接(例如重新启动 gnats)时,不会引发异常。我什至确实等待asyncio.sleep (1, loop = loop)
更改上下文并抛出了断开->重新连接异常,但这并没有发生。我究竟做错了什么?可能是一个错误?
import asyncio
from nats.aio.client import Client as NATS
import time
async def run(loop):
nc = NATS()
await nc.connect(io_loop=loop)
async def message_handler(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
print("Received a message on '{subject} {reply}': {data}".format(
subject=subject, reply=reply, data=data))
# Working
time.sleep(10)
# If nats disconnects at this point, the exception will not be caused
# and will be made attempt to send a message by nc.publish
await asyncio.sleep(2, loop=loop)
print("UNSLEEP")
await nc.publish("test", "test payload".encode())
print("PUBLISHED")
# Simple publisher and async subscriber via coroutine.
await nc.subscribe("foo", cb=message_handler)
while True:
await asyncio.sleep(1, loop=loop)
await nc.close()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
loop.close()