我正在尝试做一些非常简单的事情,当涉及 Pyspark 时,它会以某种方式转化为非常困难的事情。
我的平台上有一个非常大的数据框(~2B 行),我不允许下载,只能使用 Pyspark 代码进行分析。数据框包含去年欧洲一些物体的位置,我想计算这些物体随时间的密度。我过去曾成功使用过该功能并取得了不错的效果(至少numpy.histogram2d
我发现它的速度更快)。由于在我定义了一个 UDF 来计算密度并返回一个新的数据帧numpy
中没有等效的这个函数。pyspark
这在我只处理几行时有效(我尝试过 100K 行):
import pandas as pd
import numpy as np
def compute_density(df):
lon_bins = np.linspace(-15, 45, 100)
lat_bins = np.linspace(35, 70, 100)
density, xedges, yedges = np.histogram2d(df["corrected_latitude_degree"].values,
df["corrected_longitude_degree"].values,
[lat_bins, lon_bins])
x2d, y2d = np.meshgrid(xedges[:-1], yedges[:-1])
x_out = x2d.ravel()
y_out = y2d.ravel()
density_out = density.ravel()
data = {
'latitude': x_out,
'longitude': y_out,
'density': density_out
}
return pd.DataFrame(data)
然后我称之为
schema = StructType([
StructField("latitude", DoubleType()),
StructField("longitude", DoubleType()),
StructField("density", DoubleType())
])
preproc = (
inp
.limit(100000)
.withColumn("groups", F.lit(0))
)
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def compute_density_udf(df):
return compute_density(df)
result = preproc.groupby(["groups"]).apply(compute_density_udf)
为什么我使用GROUPED_MAP
版本来应用 UDF?当返回一个模式时,我没有设法让它与SCALAR
UDF 类型一起工作,尽管我真的不需要分组。
当我尝试在完整数据集上使用此 UDF 时,我得到了一个 OOM,因为我相信只有一个组并且 UDF 无法处理太多。我确信有一种更聪明的方法可以在pyspark
没有 UDF 的情况下直接计算它,或者分成几组,然后在最后组装结果?有没有人有任何想法/建议?