0

我正在使用 FastAPI 开发一个 API,用户可以向该 API 发出请求,以便发生以下情况:

  1. 首先,get 请求将从 Google Cloud Storage 中获取文件并将其加载到 pyspark DataFrame
  2. 然后应用程序将对 DataFrame 执行一些转换
  3. 最后,我想将 DataFrame 作为 parquet 文件写入用户的磁盘。

由于以下几个原因,我不太清楚如何以 parquet 格式将文件传递给用户:

  • df.write.parquet('out/path.parquet')将数据写入一个目录,out/path.parquet当我尝试将其传递给该目录时会遇到挑战starlette.responses.FileResponse
  • 传递我知道存在的单个 .parquet 文件starlette.responses.FileResponse似乎只是将二进制文件打印到我的控制台(如下面的代码所示)
  • 将 DataFrame 写入 Pandas 中的 BytesIO 流似乎很有希望,但我不太清楚如何使用 DataFrame 的任何方法或 DataFrame.rdd 的方法来做到这一点。

这在 FastAPI 中是否可行?在 Flask 中可以使用send_file()吗?

这是我到目前为止的代码。请注意,我已经尝试了一些类似注释代码的方法,但均无济于事。

import tempfile

from fastapi import APIRouter
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from starlette.responses import FileResponse


router = APIRouter()
sc = SparkContext('local')
spark = SparkSession(sc)

df: spark.createDataFrame = spark.read.parquet('gs://my-bucket/sample-data/my.parquet')

@router.get("/applications")
def applications():
    df.write.parquet("temp.parquet", compression="snappy")
    return FileResponse("part-some-compressed-file.snappy.parquet")
    # with tempfile.TemporaryFile() as f:
    #     f.write(df.rdd.saveAsPickleFile("temp.parquet"))
    #     return FileResponse("test.parquet")

谢谢!

编辑:我尝试使用此处提供的答案和信息,但我无法让它正常工作。

4

1 回答 1

0

我能够解决这个问题,但它远非优雅。如果有人可以提供不写入磁盘的解决方案,我将不胜感激,并将选择您的答案作为正确答案。

我能够使用 序列化 DataFrame df.rdd.saveAsPickleFile(),压缩生成的目录,将其传递给 python 客户端,将生成的 zipfile 写入磁盘,解压缩,然后SparkContext().pickleFile在最终加载 DataFrame 之前使用。我认为远非理想。

接口:

import shutil
import tempfile

from fastapi import APIRouter
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from starlette.responses import FileResponse


router = APIRouter()
sc = SparkContext('local')
spark = SparkSession(sc)

df: spark.createDataFrame = spark.read.parquet('gs://my-bucket/my-file.parquet')

@router.get("/applications")
def applications():
    temp_parquet = tempfile.NamedTemporaryFile()
    temp_parquet.close()
    df.rdd.saveAsPickleFile(temp_parquet.name)

    shutil.make_archive('test', 'zip', temp_parquet.name)

    return FileResponse('test.zip')

客户:

import io
import zipfile

import requests

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext('local')
spark = SparkSession(sc)

response = requests.get("http://0.0.0.0:5000/applications")
file_like_object = io.BytesIO(response.content)
with zipfile.ZipFile(file_like_object) as z:
    z.extractall('temp.data')

rdd = sc.pickleFile("temp.data")
df = spark.createDataFrame(rdd)

print(df.head())
于 2020-01-16T01:59:38.617 回答