client.flush()
我有一个 leetle 测试程序,除非我做一个before ,否则它不起作用client.close()
。对nats/aio/client.py的天真阅读似乎表明它close()
做了某种flush()
,所以我不明白为什么我的测试程序失败了。有必要一直flush()
在之前close()
吗?示例程序似乎没有表明这一点。
我可以看到close()
调用_flush_pending()
,但这与以下内容完全不同flush()
:
flush()
调用_send_ping()
发送一个PING
并等待PONG
响应。_send_ping()
直接写入self._io_writer
,然后调用_flush_pending()
。_flush_pending()
将 None (我猜什么都可以)推入self._flush_queue
. 这大概会唤醒_flusher()
并使其将所有内容self._pending
写入self._io_writer
.publish()
调用_send_command()
以将消息推送到self._pending
,然后还调用_flush_pending()
以使_flusher()
写入所有内容。
测试程序:
#!/usr/bin/env python3.5
import asyncio
import nats.aio.client
import nats.aio.errors
async def send_message(loop):
mq_url = "nats://nats:password@127.0.0.1:4222"
client = nats.aio.client.Client()
await client.connect(io_loop=loop, servers=[mq_url])
await client.publish("test_subject", "test1".encode())
#await client.flush()
await client.close()
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(send_message(loop))
loop.close()
if __name__ == '__main__':
main()
FWIW,如果我发送大量消息,那么在某个时候(尚未确定确切的条件),会发送消息。我们注意到在处理文件集合并为文件中的每一行发送消息时的行为:一些文件可以通过,而其他文件则不能,结果是更大的文件(更多行)成功了。所以看起来好像一些内部缓冲区已被填充,这会强制刷新。