0

我正在尝试做一些非常简单的事情,当涉及 Pyspark 时,它会以某种方式转化为非常困难的事情。

我的平台上有一个非常大的数据框(~2B 行),我不允许下载,只能使用 Pyspark 代码进行分析。数据框包含去年欧洲一些物体的位置,我想计算这些物体随时间的密度。我过去曾成功使用过该功能并取得了不错的效果(至少numpy.histogram2d我发现它的速度更快)。由于在我定义了一个 UDF 来计算密度并返回一个新的数据帧numpy中没有等效的这个函数。pyspark这在我只处理几行时有效(我尝试过 100K 行):

import pandas as pd
import numpy as np

def compute_density(df):
    lon_bins = np.linspace(-15, 45, 100)
    lat_bins = np.linspace(35, 70, 100)

    density, xedges, yedges = np.histogram2d(df["corrected_latitude_degree"].values,
                                             df["corrected_longitude_degree"].values,
                                             [lat_bins, lon_bins])
    x2d, y2d = np.meshgrid(xedges[:-1], yedges[:-1])
    x_out = x2d.ravel()
    y_out = y2d.ravel()
    density_out = density.ravel()
    data = {
            'latitude': x_out,
            'longitude': y_out,
            'density': density_out
            }
    return pd.DataFrame(data)

然后我称之为

schema = StructType([
    StructField("latitude", DoubleType()),
    StructField("longitude", DoubleType()),
    StructField("density", DoubleType())
])

preproc = (
    inp
    .limit(100000)
    .withColumn("groups", F.lit(0))
)
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def compute_density_udf(df):
    return compute_density(df)

result = preproc.groupby(["groups"]).apply(compute_density_udf)

为什么我使用GROUPED_MAP版本来应用 UDF?当返回一个模式时,我没有设法让它与SCALARUDF 类型一起工作,尽管我真的不需要分组。

当我尝试在完整数据集上使用此 UDF 时,我得到了一个 OOM,因为我相信只有一个组并且 UDF 无法处理太多。我确信有一种更聪明的方法可以在pyspark没有 UDF 的情况下直接计算它,或者分成几组,然后在最后组装结果?有没有人有任何想法/建议?

4

0 回答 0