我想在事件处理程序上使用一些自定义装饰器,以使处理会话更加舒适。
这是我没有自定义装饰器的事件处理程序:
@sio.event
async def test(sid, data):
async with sio.session(sid) as sess:
room_id = sess["room_id"]
print('test', room_id)
await sio.emit("test", {'msg': 'msg'}, room=room_id)
这是我的自定义装饰器活动:
import functools
async def sess_decorator(func):
@functools.wraps(func)
async def wrapper(sid, data, *args, **kwargs):
async with sio.session(sid) as sess:
return func(sid, data, sess, *args, **kwargs)
logger.error(f'Failed to get sess.\nsid: {sid}\ndata: {data}')
return wrapper
@sio.event
@sess_decorator
async def test(sid, data, sess):
room_id = sess["room_id"]
print('test', room_id)
await sio.emit("test", {'msg': 'msg'}, room=room_id)
问题是在应用@sess_decorator
事件之后似乎根本没有触发。如果我更改装饰器的顺序,在调用我的装饰器之前会出现以下回溯失败:
Traceback (most recent call last):
File "/home/pata/.pyenv/versions/3.7.7/envs/cartel_env/lib/python3.7/site-packages/socketio/asyncio_server.py", line 476, in _handle_event_internal
r = await server._trigger_event(data[0], namespace, sid, *data[1:])
File "/home/pata/.pyenv/versions/3.7.7/envs/cartel_env/lib/python3.7/site-packages/socketio/asyncio_server.py", line 504, in _trigger_event
ret = await self.handlers[namespace][event](*args)
TypeError: test() missing 1 required positional argument: 'sess'
似乎有一些图书馆设计问题?有没有其他方法可以完成相同的逻辑?