0

我有一个用 aiohttp 构建的 websocket 服务器。我在服务器错误流中不断收到此异常。

Task exception was never retrieved
future: <Task finished coro=<read() done, defined at /usr/local/lib/python3.4/dist-packages/aiohttp/streams.py:576> exception=ClientDisconnectedError()>
Traceback (most recent call last):
  File "/usr/lib/python3.4/asyncio/tasks.py", line 234, in _step
    result = coro.throw(exc)
  File "/usr/local/lib/python3.4/dist-packages/aiohttp/streams.py", line 578, in read
    result = yield from super().read()
  File "/usr/local/lib/python3.4/dist-packages/aiohttp/streams.py", line 433, in read
    yield from self._waiter
  File "/usr/lib/python3.4/asyncio/futures.py", line 386, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/usr/lib/python3.4/asyncio/tasks.py", line 287, in _wakeup
    value = future.result()
  File "/usr/lib/python3.4/asyncio/futures.py", line 275, in result
    raise self._exception
aiohttp.errors.ClientDisconnectedError

客户端显示一条消息:

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f67ec0f0588>

处理程序中的代码是:

@asyncio.coroutine
def sync(self, request):
    ws = web.WebSocketResponse()
    yield from ws.prepare(request)

    # while True:
    msg = yield from ws.receive()
    if msg.tp == aiohttp.MsgType.text:
        payload = msg.data
        pypayload = json.loads(payload)
        result = {'result': {}}
        for store in pypayload:
            try:
                sync_obj = yield from asyncio.async(self.prepare(store))
            except (IndexError, TypeError, ValidationError) as exc:
                yield from asyncio.async(self.handle_internal_error(exc, store))
            else:
                try:
                    sync_result, request_type = yield from asyncio.async(self.send_data(sync_obj))
                except DuplicateMappingsFound as exc:
                    yield from asyncio.async(self.handle_internal_error(exc, store))
                else:
                    if sync_result.status == 200 and request_type == 'post':
                        yield from asyncio.async(self.process_data(sync_result))
                    elif sync_result.status >= 400:
                        yield from asyncio.async(self.handle_error(sync_result, sync_obj))
                    result['result'].update(
                            {store['store_id']: sync_result.status}
                        )
                    yield from asyncio.async(sync_result.release())
        ws.send_str(json.dumps(result))
    elif msg.tp == aiohttp.MsgType.error:
        print('ws connection closed with exception {0}'.format(ws.exception()))

    yield from asyncio.async(ws.close())

    print('websocket connection closed')

    return ws

客户端代码为:

@asyncio.coroutine
def sync_store():
    resp = yield from aiohttp.get('http://localhost/stores/search')
    stores = yield from resp.json()
    total_page = stores['page']['total_page']
    page = stores['page']['current_page']
    total_resp = []
    ws_sockets = []
    while True:
        for page in range(page, total_page):
            session = aiohttp.ClientSession()
            ws = yield from asyncio.async(session.ws_connect('ws://localhost:8765/stores'))
            ws_sockets.append(ws)
            ws.send_str(json.dumps(stores['data']))
            resp = yield from asyncio.async(ws.receive())
            total_resp.append(resp.data)
            # print(resp)
            stores_resp = yield from asyncio.async(aiohttp.post('http://localhost/stores/search',
                                                                data=json.dumps({'page': page+1}),
                                                                headers={'content-type': 'application/json'}
                                                                ))
            stores = yield from asyncio.async(stores_resp.json())
        while ws_sockets:
            session = ws_sockets.pop(0)
            msg = yield from session.receive()
            if not(msg.tp == aiohttp.MsgType.close or msg.tp == aiohttp.MsgType.closed):
                ws_sockets.append(session)
            else:
                print(ws_sockets)
        break
    print(total_resp)

这可能是什么问题?我也尝试启用调试模式,但这似乎也没有提供任何有用的输出。

4

0 回答 0