2

测试 Sanic,目前有一些路由,当命中时会触发对 SQS 的写入。尝试通过将信息添加到队列中来使写入异步,然后“独立”(以非阻塞方式)使用 Sanic 服务器返回的响应。

下面是我到目前为止的代码。它调用了 SQS,但是似乎我在使用错误的循环/创建多个循环时出错-> 我收到一条错误消息,指出“循环参数必须与未来一致”,并且服务器只是挂起,根本没有返回响应。

此外,Sanic 使用 uvloop,我不确定如何/是否应该将队列集成到 uvloop 而不是单独的 asyncio 循环中。Sanic 服务器通过传递一个 uvloop (uvloop.new_event_loop()) 来实例化。

import asyncio
asyncio_loop = asyncio.get_event_loop
async_queue = asyncio.Queue()

async def consumer_async_queue(q):
    while True:
        item = await q.get()
        if item is None:
            q.task_done()
            break
        else:
            # function call to SQS
            await record_to_sqs(item['json'], item['log_to_meta'])
            q.task_done()

async def producer_async_queue(q, item):
    await q.put(item)
    await q.put(None)
    await q.join()

async def main(q, item):
    producers = asyncio_loop.create_task(producer_async_queue(q, item))
    consumers = asyncio_loop.create_task(consumer_async_queue(q))
    await asyncio.wait([producers] + [consumers])

async def service():
    * Other Stuff *
    try:
        print(dir(asyncio_loop))
        asyncio_loop.create_task(main(async_queue, item))
        asyncio_loop.run_forever()
    except Exception as e:
        print(e)
        print("ERRORING")
    finally:
        pass
        # asyncio_loop.close()


@app.route('/api/example', methods=['GET'])
async def route(request):
    return await service(request)
4

0 回答 0