UploadFile 使用 Python 的SpooledTemporaryFile,一个“存储在内存中的文件”,并且“一旦关闭就会被销毁”。有关这方面的更多信息,请查看此答案。要在途中解决问题(即,从 csv 文件中读取而不使用可以从中获取的文件内容contents = await file.read()
),您可以将文件内容复制到NamedTemporaryFile 中(再次查看此答案以获取更多信息),然后用它来遍历 csv 内容。下面是一个工作示例:
import uvicorn
from fastapi import FastAPI, File, UploadFile
from tempfile import NamedTemporaryFile
import os
import csv
app = FastAPI()
@app.post("/upload/")
async def upload(file: UploadFile = File(...)):
contents = await file.read()
data = {}
file_copy = NamedTemporaryFile(delete=False)
try:
with file_copy as f: # The 'with' block ensures that the file closes and data are stored
f.write(contents);
with open(file_copy.name,'r', encoding='utf-8') as csvf:
csvReader = csv.DictReader(csvf)
for rows in csvReader:
key = rows['No']
data[key] = rows
finally:
file_copy.close() # Remember to close any file instances before removing the temp file
os.unlink(file_copy.name) # unlink (remove) the file from the system's Temp folder
return data
if __name__ == '__main__':
uvicorn.run(app, host='127.0.0.1', port=8000, debug=True)
更新
或者,如前所述,一个更优雅的解决方案是使用现有文件的字节内容。字节数据按此处所述进行解码。请参见下面的示例:
import uvicorn
from fastapi import FastAPI, File, UploadFile
import os
import csv
from io import StringIO
app = FastAPI()
@app.post("/upload/")
async def upload(file: UploadFile = File(...)):
data = {}
contents = await file.read()
decoded = contents.decode()
csvf = StringIO(decoded)
csvReader = csv.DictReader(csvf)
for rows in csvReader:
key = rows['No']
data[key] = rows
return data
if __name__ == '__main__':
uvicorn.run(app, host='127.0.0.1', port=8000, debug=True)