1

我在 VirtualBox 上配置并配置了一个 Fedora 34 vm,具有 2048 MB RAM,以在localhost:7070. 完整的应用程序源代码和依赖代码和说明在这里。下面是我可以制作的最小的可重现示例。

main.py

import os, pathlib

import fastapi as fast
import aiofiles

        
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
RESULTS_DIR = pathlib.Path('/'.join((ROOT_DIR, 'results')))

    
app = fast.FastAPI()

    
@app.post('/api')
async def upload(
    request: fast.Request, 
    file: fast.UploadFile = fast.File(...),
    filedir: str = ''):
        
    dest = RESULTS_DIR.joinpath(filedir, file.filename)
    dest.parent.mkdir(parents=True, exist_ok=True)

    async with aiofiles.open(dest, 'wb') as buffer:
        await file.seek(0)
        contents = await file.read()
        await buffer.write(contents)

    return f'localhost:7070/{dest.parent.name}/{dest.name}'

start.sh服务器应用程序

#! /bin/bash
uvicorn --host "0.0.0.0" --log-level debug --port 7070 main:app

client.py

import httpx
from pathlib import Path
import asyncio

async def async_post_file_req(url: str, filepath: Path):    
    async with httpx.AsyncClient(
        timeout=httpx.Timeout(write=None, read=None, connect=None, pool=None)) as client:
        r = await client.post(
            url, 
            files={
                'file': (filepath.name, filepath.open('rb'), 'application/octet-stream')
            }
        )

if __name__ == '__main__':
    url = 'http://localhost:7070'
    asyncio.run(
        async_post_file_req(
            f'{url}/api',            
            Path('~/1500M.txt')
    ))

创建一个 1500 MB 的文件

truncate -s 1500M 1500M.txt

当上传一个 1500 MB 的文件时,当前的实现upload似乎是将整个文件读入内存,然后服务器响应{status: 400, reason: 'Bad Request', details: 'There was an error parsing the body.'},文件并没有写入磁盘。上传 825 MB 文件时,服务器响应 200,然后将文件写入磁盘。我不明白为什么在解析较大的文件时会出错。

这是怎么回事?

如何上传大于机器可用内存的文件?

我必须流式传输身体吗?

4

1 回答 1

4

深入研究源代码,我发现 FastAPI 在试图确定是否需要读取请求表单或正文时,会在源代码中抛出状态代码为 400 和详细信息的 HTTPThere was an error in parsing body 异常。FastAPI请求基本上是 Starlette 请求,因此我将 FastAPI 服务器应用程序重新实现为 Starlette 应用程序,希望它能够绕过这个异常处理程序并为我提供有关此问题的更多信息。

main.py

from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route
async def homepage(request):
    return JSONResponse({'hello': 'world'})
async def upload(request):
  form = await request.form()
  print(type(form['upload_file']))
  filename = form['upload_file'].filename or 'not found'
  contents = await form['upload_file'].read()
  b = len(contents) or -1
  return JSONResponse({
    'filename': filename,
    'bytes': b
  })
app = Starlette(debug=True, routes=[
    Route('/', homepage),
    Route('/api', upload, methods=['POST'])
])

Pipfile

[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[packages]
starlette = "*"
uvicorn = "*"
uvloop = "*"
httpx = "*"
watchgod = "*"
python-multipart = "*"

[dev-packages]

[requires]
python_version = "3.9"

在发布大小为 989 MiB 或更大的文件时,Starlette 应用程序会引发操作系统错误 28,设备上没有剩余空间。大小为 988 MiB 或更小的文件不会导致错误。

INFO:     10.0.2.2:46996 - "POST /api HTTP/1.1" 500 Internal Server Error
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/uvicorn/protocols/http/httptools_impl.py", line 398, in run_asgi
    result = await app(self.scope, self.receive, self.send)
  File "/usr/local/lib/python3.9/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
    return await self.app(scope, receive, send)
  File "/usr/local/lib/python3.9/site-packages/starlette/applications.py", line 112, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.9/site-packages/starlette/middleware/errors.py", line 181, in __call__
    raise exc from None
  File "/usr/local/lib/python3.9/site-packages/starlette/middleware/errors.py", line 159, in __call__
    await self.app(scope, receive, _send)
  File "/usr/local/lib/python3.9/site-packages/starlette/exceptions.py", line 82, in __call__
    raise exc from None
  File "/usr/local/lib/python3.9/site-packages/starlette/exceptions.py", line 71, in __call__
    await self.app(scope, receive, sender)
  File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 580, in __call__
    await route.handle(scope, receive, send)
  File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 241, in handle
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 52, in app
    response = await func(request)
  File "/home/vagrant/star-file-server/./main.py", line 11, in upload
    form = await request.form()
  File "/usr/local/lib/python3.9/site-packages/starlette/requests.py", line 240, in form
    self._form = await multipart_parser.parse()
  File "/usr/local/lib/python3.9/site-packages/starlette/formparsers.py", line 231, in parse
    await file.write(message_bytes)
  File "/usr/local/lib/python3.9/site-packages/starlette/datastructures.py", line 445, in write
    await run_in_threadpool(self.file.write, data)
  File "/usr/local/lib/python3.9/site-packages/starlette/concurrency.py", line 40, in run_in_threadpool
    return await loop.run_in_executor(None, func, *args)
  File "/usr/lib64/python3.9/concurrent/futures/thread.py", line 52, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/usr/lib64/python3.9/tempfile.py", line 755, in write
    rv = file.write(s)
OSError: [Errno 28] No space left on device

Starlette 的 UploadFile 数据结构使用SpooledTemporaryFile。该对象写入您操作系统的临时目录。我的临时目录是/tmp因为我在 Fedora 34 上,并且我没有创建任何环境变量来告诉 python 使用其他任何东西作为临时目录。

[vagrant@fedora star-file-server]$ python
Python 3.9.5 (default, May 14 2021, 00:00:00) 
[GCC 11.1.1 20210428 (Red Hat 11.1.1-1)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import tempfile
>>> tempfile.gettempdir()
'/tmp'
[vagrant@fedora star-file-server]$ df -h
Filesystem      Size  Used Avail Use% Mounted on
devtmpfs        974M     0  974M   0% /dev
tmpfs           989M  168K  989M   1% /dev/shm
tmpfs           396M  5.6M  390M   2% /run
/dev/sda1        40G  1.6G   36G   5% /
tmpfs           989M     0  989M   0% /tmp
tmpfs           198M   84K  198M   1% /run/user/1000

Starlette 设置max_sizeSpooledTemporaryDirectory1 MiB。从 Python tempfile 文档中,我认为这意味着在使用临时文件时,一次只能将 1 MiB 读入内存。尽管它是 1 MiB,但 989 MiB 似乎是UploadFile大小的正确硬边界,因为SpooledTemporaryDirectory它受系统临时目录可用存储的约束。

如果我仍然想使用,UploadFile我可以创建一个环境变量来指向一个已知始终有足够可用空间的设备,即使对于最大的上传也是如此。

export TMPDIR=/huge_storage_device

我更喜欢的方法使用请求的stream, 以避免必须将文件写入两次,第一次写入本地临时目录,第二次写入本地永久目录。

import os, pathlib

import fastapi as fast
import aiofiles

app = fast.FastAPI()


@app.post('/stream')
async def stream(
    request: fast.Request,
    filename: str,
    filedir: str = ''
):

    dest = RESULTS_DIR.joinpath(filedir, filename)
    dest.parent.mkdir(parents=True, exist_ok=True)        

    async with aiofiles.open(dest, 'wb') as buffer:       
        async for chunk in request.stream():
            await buffer.write(chunk)

    return {
        'loc': f'localhost:7070/{dest.parent.name}/{dest.name}'
    }   

使用这种方法,当我将文件(5M、450M、988M 各有两次重复测量)上传到运行在 2048 MiB 内存的 Fedora vm 上的服务器时,服务器永远不会占用太多内存,永远不会崩溃,平均延迟减少是 40%(即发布到的延迟/stream大约是发布到的延迟的 60% /api)。

于 2021-05-28T13:52:24.897 回答