0

client.flush()我有一个 leetle 测试程序,除非我做一个before ,否则它不起作用client.close()对nats/aio/client.py的天真阅读似乎表明它close()做了某种flush(),所以我不明白为什么我的测试程序失败了。有必要一直flush()在之前close()吗?示例程序似乎没有表明这一点。

我可以看到close()调用_flush_pending(),但这与以下内容完全不同flush()

  • flush()调用_send_ping()发送一个PING并等待PONG响应。_send_ping()直接写入self._io_writer,然后调用_flush_pending()

  • _flush_pending()将 None (我猜什么都可以)推入self._flush_queue. 这大概会唤醒_flusher()并使其将所有内容self._pending写入self._io_writer.

  • publish()调用_send_command()以将消息推送到self._pending,然后还调用_flush_pending()以使_flusher()写入所有内容。

测试程序:

#!/usr/bin/env python3.5

import asyncio
import nats.aio.client
import nats.aio.errors

async def send_message(loop):
    mq_url = "nats://nats:password@127.0.0.1:4222"
    client = nats.aio.client.Client()
    await client.connect(io_loop=loop, servers=[mq_url])
    await client.publish("test_subject", "test1".encode())
    #await client.flush()
    await client.close()

def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(send_message(loop))
    loop.close()

if __name__ == '__main__':
    main()

FWIW,如果我发送大量消息,那么在某个时候(尚未确定确切的条件),会发送消息。我们注意到在处理文件集合并为文件中的每一行发送消息时的行为:一些文件可以通过,而其他文件则不能,结果是更大的文件(更多行)成功了。所以看起来好像一些内部缓冲区已被填充,这会强制刷新。

4

1 回答 1

0

看起来可能是一个刚刚修复的错误:https ://github.com/nats-io/asyncio-nats/pull/35

于 2017-05-05T02:58:18.010 回答