测试 Sanic,目前有一些路由,当命中时会触发对 SQS 的写入。尝试通过将信息添加到队列中来使写入异步,然后“独立”(以非阻塞方式)使用 Sanic 服务器返回的响应。
下面是我到目前为止的代码。它调用了 SQS,但是似乎我在使用错误的循环/创建多个循环时出错-> 我收到一条错误消息,指出“循环参数必须与未来一致”,并且服务器只是挂起,根本没有返回响应。
此外,Sanic 使用 uvloop,我不确定如何/是否应该将队列集成到 uvloop 而不是单独的 asyncio 循环中。Sanic 服务器通过传递一个 uvloop (uvloop.new_event_loop()) 来实例化。
import asyncio
asyncio_loop = asyncio.get_event_loop
async_queue = asyncio.Queue()
async def consumer_async_queue(q):
while True:
item = await q.get()
if item is None:
q.task_done()
break
else:
# function call to SQS
await record_to_sqs(item['json'], item['log_to_meta'])
q.task_done()
async def producer_async_queue(q, item):
await q.put(item)
await q.put(None)
await q.join()
async def main(q, item):
producers = asyncio_loop.create_task(producer_async_queue(q, item))
consumers = asyncio_loop.create_task(consumer_async_queue(q))
await asyncio.wait([producers] + [consumers])
async def service():
* Other Stuff *
try:
print(dir(asyncio_loop))
asyncio_loop.create_task(main(async_queue, item))
asyncio_loop.run_forever()
except Exception as e:
print(e)
print("ERRORING")
finally:
pass
# asyncio_loop.close()
@app.route('/api/example', methods=['GET'])
async def route(request):
return await service(request)