27

在服务 FastAPI 请求时,我有一个 CPU 密集型任务要对列表的每个元素执行。我想在多个 CPU 内核上进行此处理。

在 FastAPI 中执行此操作的正确方法是什么?我可以使用标准multiprocessing模块吗?到目前为止,我发现的所有教程/问题仅涵盖了 I/O 绑定任务,例如 Web 请求。

4

1 回答 1

53

async def端点

您可以使用带有ProcessPoolExecutor的loop.run_in_executor在单独的进程中启动函数。

@app.post("/async-endpoint")
async def test_endpoint():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_bound_func)  # wait result

def端点

由于def端点在单独的线程中隐式运行,因此您可以使用模块multiprocessingconcurrent.futures的全部功能。注意里面的def函数,await不能用。样品:

@app.post("/def-endpoint")
def test_endpoint():
    ...
    with multiprocessing.Pool(3) as p:
        result = p.map(f, [1, 2, 3])
@app.post("/def-endpoint/")
def test_endpoint():
    ...
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
      results = executor.map(f, [1, 2, 3])

注意应该记住,在端点中创建进程池以及创建大量线程会随着请求数量的增加而导致响应速度变慢。


即时执行

在单独的进程中执行函数并立即等待结果的最简单和最原生的方法是将loop.run_in_executorProcessPoolExecutor一起使用。

可以在应用程序启动时创建一个池,如下例所示,并且不要忘记在应用程序退出时关闭。可以使用max_workers ProcessPoolExecutor构造函数参数设置池中使用的进程数。如果max_workers给出None或没有给出,它将默认为机器上的处理器数量。

这种方法的缺点是请求处理程序(路径操作)在单独的进程中等待计算完成,而客户端连接保持打开状态。如果由于某种原因连接丢失,那么结果将无处可返回。

import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from fastapi import FastAPI

from calc import cpu_bound_func

app = FastAPI()


async def run_in_process(fn, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result


@app.get("/{param}")
async def handler(param: int):
    res = await run_in_process(cpu_bound_func, param)
    return {"result": res}


@app.on_event("startup")
async def on_startup():
    app.state.executor = ProcessPoolExecutor()


@app.on_event("shutdown")
async def on_shutdown():
    app.state.executor.shutdown()

移至背景

通常,CPU 密集型任务在后台执行。FastAPI 提供了在返回响应运行后台任务的能力,您可以在其中启动并异步等待 CPU 绑定任务的结果。

在这种情况下,例如,您可以立即返回"Accepted"(HTTP 代码 202)和唯一任务的响应,ID在后台继续计算,客户端稍后可以使用 this 请求任务的状态ID

BackgroundTasks提供一些特性,特别是你可以运行其中的几个(包括在依赖项中)。在它们中,您可以使用在依赖项中获得的资源,这些资源只有在所有任务完成后才会被清理,而在出现异常时可以正确处理它们。在这张中可以更清楚地看到这一点。

下面是一个执行最小任务跟踪的示例。假设应用程序的一个实例正在运行。

import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from http import HTTPStatus

from fastapi import BackgroundTasks
from typing import Dict
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field

from calc import cpu_bound_func


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    result: int = None


app = FastAPI()
jobs: Dict[UUID, Job] = {}


async def run_in_process(fn, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result


async def start_cpu_bound_task(uid: UUID, param: int) -> None:
    jobs[uid].result = await run_in_process(cpu_bound_func, param)
    jobs[uid].status = "complete"


@app.post("/new_cpu_bound_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(param: int, background_tasks: BackgroundTasks):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(start_cpu_bound_task, new_task.uid, param)
    return new_task


@app.get("/status/{uid}")
async def status_handler(uid: UUID):
    return jobs[uid]


@app.on_event("startup")
async def startup_event():
    app.state.executor = ProcessPoolExecutor()


@app.on_event("shutdown")
async def on_shutdown():
    app.state.executor.shutdown()

更强大的解决方案

上面所有的例子都很简单,但是如果你需要一些更强大的系统来进行繁重的分布式计算,那么你可以看看消息代理RabbitMQ,等等。以及使用它们的库,比如 Celery KafkaNATS

于 2020-07-30T10:14:59.887 回答