我有一个用 aiohttp 构建的 websocket 服务器。我在服务器错误流中不断收到此异常。
Task exception was never retrieved
future: <Task finished coro=<read() done, defined at /usr/local/lib/python3.4/dist-packages/aiohttp/streams.py:576> exception=ClientDisconnectedError()>
Traceback (most recent call last):
File "/usr/lib/python3.4/asyncio/tasks.py", line 234, in _step
result = coro.throw(exc)
File "/usr/local/lib/python3.4/dist-packages/aiohttp/streams.py", line 578, in read
result = yield from super().read()
File "/usr/local/lib/python3.4/dist-packages/aiohttp/streams.py", line 433, in read
yield from self._waiter
File "/usr/lib/python3.4/asyncio/futures.py", line 386, in __iter__
yield self # This tells Task to wait for completion.
File "/usr/lib/python3.4/asyncio/tasks.py", line 287, in _wakeup
value = future.result()
File "/usr/lib/python3.4/asyncio/futures.py", line 275, in result
raise self._exception
aiohttp.errors.ClientDisconnectedError
客户端显示一条消息:
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f67ec0f0588>
处理程序中的代码是:
@asyncio.coroutine
def sync(self, request):
ws = web.WebSocketResponse()
yield from ws.prepare(request)
# while True:
msg = yield from ws.receive()
if msg.tp == aiohttp.MsgType.text:
payload = msg.data
pypayload = json.loads(payload)
result = {'result': {}}
for store in pypayload:
try:
sync_obj = yield from asyncio.async(self.prepare(store))
except (IndexError, TypeError, ValidationError) as exc:
yield from asyncio.async(self.handle_internal_error(exc, store))
else:
try:
sync_result, request_type = yield from asyncio.async(self.send_data(sync_obj))
except DuplicateMappingsFound as exc:
yield from asyncio.async(self.handle_internal_error(exc, store))
else:
if sync_result.status == 200 and request_type == 'post':
yield from asyncio.async(self.process_data(sync_result))
elif sync_result.status >= 400:
yield from asyncio.async(self.handle_error(sync_result, sync_obj))
result['result'].update(
{store['store_id']: sync_result.status}
)
yield from asyncio.async(sync_result.release())
ws.send_str(json.dumps(result))
elif msg.tp == aiohttp.MsgType.error:
print('ws connection closed with exception {0}'.format(ws.exception()))
yield from asyncio.async(ws.close())
print('websocket connection closed')
return ws
客户端代码为:
@asyncio.coroutine
def sync_store():
resp = yield from aiohttp.get('http://localhost/stores/search')
stores = yield from resp.json()
total_page = stores['page']['total_page']
page = stores['page']['current_page']
total_resp = []
ws_sockets = []
while True:
for page in range(page, total_page):
session = aiohttp.ClientSession()
ws = yield from asyncio.async(session.ws_connect('ws://localhost:8765/stores'))
ws_sockets.append(ws)
ws.send_str(json.dumps(stores['data']))
resp = yield from asyncio.async(ws.receive())
total_resp.append(resp.data)
# print(resp)
stores_resp = yield from asyncio.async(aiohttp.post('http://localhost/stores/search',
data=json.dumps({'page': page+1}),
headers={'content-type': 'application/json'}
))
stores = yield from asyncio.async(stores_resp.json())
while ws_sockets:
session = ws_sockets.pop(0)
msg = yield from session.receive()
if not(msg.tp == aiohttp.MsgType.close or msg.tp == aiohttp.MsgType.closed):
ws_sockets.append(session)
else:
print(ws_sockets)
break
print(total_resp)
这可能是什么问题?我也尝试启用调试模式,但这似乎也没有提供任何有用的输出。