1

我正在编写一些代码,这些代码需要向某个服务器发送并发 HTTP 请求,以防万一出现错误,它应该重试。当我运行它时,我看到快速发送,几分钟后发送速度降低到 0。就像某些东西被锁定并且没有释放,它只是停止发送。

发送数据的函数是这个

@asyncio.coroutine
def handle_line(self, line, destination):
    while True:
        response = yield from aiohttp.request('POST', destination,
                                              data=line,
                                              headers=headers,
                                              connector=self.tcp_conn)
        if response.status == 200:
            self._succeeded += 1
            if self._succeeded % 20000 == 0 and self._succeeded != 0:
                logger.debug("Sent {} events from file".format(self._succeeded))
                self.update_dynamo_counter(destination)
            break
        elif response.status != 200:
            logger.debug("Error sending data. Status: {}".format(response.status))
            continue
    return (yield from response.read())

连接器正在初始化

tcp_conn = aiohttp.TCPConnector(limit=100)

通过将 800 个任务添加到列表中来调用发送函数,然后等待它们避免阻塞 io

lines_to_send.append(asyncio.async(self.handle_line(json.dumps(data), destination)))
if len(lines_to_send) % 800 == 0:
    yield from asyncio.wait(lines_to_send)
    lines_to_send = []

我在调试它时遇到问题,甚至无法弄清楚问题所在。有人有线索吗?

4

0 回答 0