我正在编写一些代码,这些代码需要向某个服务器发送并发 HTTP 请求,以防万一出现错误,它应该重试。当我运行它时,我看到快速发送,几分钟后发送速度降低到 0。就像某些东西被锁定并且没有释放,它只是停止发送。
发送数据的函数是这个
@asyncio.coroutine
def handle_line(self, line, destination):
while True:
response = yield from aiohttp.request('POST', destination,
data=line,
headers=headers,
connector=self.tcp_conn)
if response.status == 200:
self._succeeded += 1
if self._succeeded % 20000 == 0 and self._succeeded != 0:
logger.debug("Sent {} events from file".format(self._succeeded))
self.update_dynamo_counter(destination)
break
elif response.status != 200:
logger.debug("Error sending data. Status: {}".format(response.status))
continue
return (yield from response.read())
连接器正在初始化
tcp_conn = aiohttp.TCPConnector(limit=100)
通过将 800 个任务添加到列表中来调用发送函数,然后等待它们避免阻塞 io
lines_to_send.append(asyncio.async(self.handle_line(json.dumps(data), destination)))
if len(lines_to_send) % 800 == 0:
yield from asyncio.wait(lines_to_send)
lines_to_send = []
我在调试它时遇到问题,甚至无法弄清楚问题所在。有人有线索吗?