我正在一个 FastAPI 端点工作,该端点进行 I/O 绑定操作,这是异步的以提高效率。但是,这需要时间,所以我想缓存结果以便在一段时间内重复使用它。
我目前有这个:
from fastapi import FastAPI
import asyncio
app = FastAPI()
async def _get_expensive_resource(key) -> None:
await asyncio.sleep(2)
return True
@app.get('/')
async def get(key):
return await _get_expensive_resource(key)
if __name__ == "__main__":
import uvicorn
uvicorn.run("test:app")
我正在尝试使用该cachetools
包来缓存结果,并且我尝试了类似以下的方法:
import asyncio
from cachetools import TTLCache
from fastapi import FastAPI
app = FastAPI()
async def _get_expensive_resource(key) -> None:
await asyncio.sleep(2)
return True
class ResourceCache(TTLCache):
def __missing__(self, key):
loop = asyncio.get_event_loop()
resource = loop.run_until_complete(_get_expensive_resource(key))
self[key] = resource
return resource
resource_cache = ResourceCache(124, 300)
@app.get('/')
async def get(key: str):
return resource_cache[key]
if __name__ == "__main__":
import uvicorn
uvicorn.run("test2:app")
但是,这失败了,因为据我了解,该__missing__
方法是同步的,您不能从异步中的同步中调用异步。错误是:
RuntimeError: this event loop is already running.
如果我使用纯 asyncio 而不是 uvloop,则会发生类似的错误。
对于 asyncio 事件循环,我尝试过使用nest_asyncio
包,但它没有打补丁uvloop
,而且,即使将它与 asyncio 一起使用,第一次使用后服务似乎也会冻结。
你知道我怎么能完成这个吗?