4

我正在尝试从 pandas_udf 返回一个特定的结构。它在一个集群上工作,但在另一个集群上失败。我尝试在组上运行 udf,这要求返回类型是数据框。

from pyspark.sql.functions import pandas_udf
import pandas as pd
import numpy as np
from pyspark.sql.types import *

schema = StructType([
  StructField("Distance", FloatType()),
  StructField("CarId", IntegerType())

])


def haversine(lon1, lat1, lon2, lat2):
    #Calculate distance, return scalar
    return 3.5 # Removed logic to facilitate reading


@pandas_udf(schema)
def totalDistance(oneCar):
    dist = haversine(oneCar.Longtitude.shift(1),
                     oneCar.Latitude.shift(1),
                     oneCar.loc[1:, 'Longitude'], 
                     oneCar.loc[1:, 'Latitude'])

    return pd.DataFrame({"CarId":oneCar['CarId'].iloc[0],"Distance":np.sum(dist)},index = [0])


## Calculate the overall distance made by each car
distancePerCar= df.groupBy('CarId').apply(totalDistance)

这是我得到的例外:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in returnType(self)
    114             try:
--> 115                 to_arrow_type(self._returnType_placeholder)
    116             except TypeError:

C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\types.py in to_arrow_type(dt)
   1641     else:
-> 1642         raise TypeError("Unsupported type in conversion to Arrow: " + str(dt))
   1643     return arrow_type

TypeError: Unsupported type in conversion to Arrow: StructType(List(StructField(CarId,IntegerType,true),StructField(Distance,FloatType,true)))

During handling of the above exception, another exception occurred:

NotImplementedError                       Traceback (most recent call last)
<ipython-input-35-4f2194cfb998> in <module>()
     18     km = 6367 * c
     19     return km
---> 20 @pandas_udf("CarId: int, Distance: float")
     21 def totalDistance(oneUser):
     22     dist = haversine(oneUser.Longtitude.shift(1), oneUser.Latitude.shift(1),

C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in _create_udf(f, returnType, evalType)
     62     udf_obj = UserDefinedFunction(
     63         f, returnType=returnType, name=None, evalType=evalType, deterministic=True)
---> 64     return udf_obj._wrapped()
     65 
     66 

C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in _wrapped(self)
    184 
    185         wrapper.func = self.func
--> 186         wrapper.returnType = self.returnType
    187         wrapper.evalType = self.evalType
    188         wrapper.deterministic = self.deterministic

C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in returnType(self)
    117                 raise NotImplementedError(
    118                     "Invalid returnType with scalar Pandas UDFs: %s is "
--> 119                     "not supported" % str(self._returnType_placeholder))
    120         elif self.evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
    121             if isinstance(self._returnType_placeholder, StructType):

NotImplementedError: Invalid returnType with scalar Pandas UDFs: StructType(List(StructField(CarId,IntegerType,true),StructField(Distance,FloatType,true))) is not supported

我也尝试将架构更改为

@pandas_udf("<CarId:int,Distance:float>")

@pandas_udf("CarId:int,Distance:float")

但得到同样的例外。我怀疑这与我的 pyarrow 版本有关,它与我的 pyspark 版本不兼容。

任何帮助,将不胜感激。谢谢!

4

1 回答 1

7

正如错误消息(“Invalid returnType with scalar Pandas UDFs”)中所报告的,您正在尝试创建一个 SCALAR 矢量化 pandas UDF,但使用的是 StructType 模式并返回一个 pandas DataFrame。

您应该将您的函数声明为 GROUPED MAP pandas UDF,即:

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)

pyspark 文档中解释了标量和分组矢量化 UDF 之间的区别:http: //spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf

标量 UDF 定义了一个转换:一个或多个 pandas.Series -> 一个 pandas.Series。returnType 应该是原始数据类型,例如 DoubleType()。返回的 pandas.Series 的长度必须与输入的 pandas.Series 的长度相同。

总而言之,标量 pandas UDF 一次处理一列(pandas 系列),比一次处理一个行元素的传统 UDF 具有更好的性能。请注意,性能改进是由于使用 PyArrow 的高效 python 序列化。

分组映射 UDF 定义转换: Pandas.DataFrame -> Pandas.DataFrame returnType 应该是描述返回的 pandas.DataFrame 架构的 StructType。返回的 pandas.DataFrame 的长度可以是任意的,并且必须对列进行索引,以便它们的位置与架构中的相应字段匹配。

分组的 pandas UDF 一次处理多行和多列(使用 pandas DataFrame,不要与 Spark DataFrame 混淆),对于多变量操作非常有用和高效(尤其是在使用本地 python 数值分析和机器学习库时numpy、scipy、scikit-learn 等)。在这种情况下,输出是具有多列的单行 DataFrame。

请注意,我没有检查代码的内部逻辑,只检查方法。

于 2018-09-18T10:46:00.643 回答