我正在使用 FastAPI 开发一个 API,用户可以向该 API 发出请求,以便发生以下情况:
- 首先,get 请求将从 Google Cloud Storage 中获取文件并将其加载到 pyspark DataFrame
- 然后应用程序将对 DataFrame 执行一些转换
- 最后,我想将 DataFrame 作为 parquet 文件写入用户的磁盘。
由于以下几个原因,我不太清楚如何以 parquet 格式将文件传递给用户:
df.write.parquet('out/path.parquet')
将数据写入一个目录,out/path.parquet
当我尝试将其传递给该目录时会遇到挑战starlette.responses.FileResponse
- 传递我知道存在的单个 .parquet 文件
starlette.responses.FileResponse
似乎只是将二进制文件打印到我的控制台(如下面的代码所示) - 将 DataFrame 写入 Pandas 中的 BytesIO 流似乎很有希望,但我不太清楚如何使用 DataFrame 的任何方法或 DataFrame.rdd 的方法来做到这一点。
这在 FastAPI 中是否可行?在 Flask 中可以使用send_file()吗?
这是我到目前为止的代码。请注意,我已经尝试了一些类似注释代码的方法,但均无济于事。
import tempfile
from fastapi import APIRouter
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from starlette.responses import FileResponse
router = APIRouter()
sc = SparkContext('local')
spark = SparkSession(sc)
df: spark.createDataFrame = spark.read.parquet('gs://my-bucket/sample-data/my.parquet')
@router.get("/applications")
def applications():
df.write.parquet("temp.parquet", compression="snappy")
return FileResponse("part-some-compressed-file.snappy.parquet")
# with tempfile.TemporaryFile() as f:
# f.write(df.rdd.saveAsPickleFile("temp.parquet"))
# return FileResponse("test.parquet")
谢谢!
编辑:我尝试使用此处提供的答案和信息,但我无法让它正常工作。