2

最好的

目前我正在尝试使用 pyspark pandas_udf,但不幸的是,当我返回包含以下内容的 DataFrame 时遇到了一些问题:NA、None 或 NaNs。如果我使用的是 FloatType,那么结果是好的,但是一旦我使用 IntegerType、TimestampType 等......我收到一个错误并且它不再工作了。

以下是一些有效和无效的示例:

What does work?
示例 1)

custom_schema = StructType([
                        StructField('User',StringType(),True),
                        StructField('Sport',StringType(),True),
                        StructField('Age',IntegerType(),True),
                        StructField('Age_lag',FloatType(),True),
                        ])

# the schema is what it needs as an output format
@pandas_udf(custom_schema, PandasUDFType.GROUPED_MAP)
def my_custom_function(pdf):
    # Input/output are both a pandas.DataFrame

    #return a totalaly different DataFrame
    dt = pd.DataFrame({'User': ['Alice', 'Bob'], 'Sport': ['Football', 'Basketball'], 'Age': [27, 34]})
    dt['Age_lag'] = dt['Age'].shift(1)

    return dt

df.groupby('id').apply(my_custom_function).toPandas() 

结果:

    User    Sport   Age     Age_lag
0   Alice   Football    27  NaN
1   Bob     Basketball  34  27.0
2   Alice   Football    27  NaN
3   Bob     Basketball  34  27.0

例 2)

如果我们将 Age_lag 的类型更改为IntegerType ()并用 -1 填充 Na,那么我们仍然有一个有效的结果(没有 NaN)

custom_schema = StructType([
                        StructField('User',StringType(),True),
                        StructField('Sport',StringType(),True),
                        StructField('Age',IntegerType(),True),
                        StructField('Age_lag',IntegerType(),True),
                        ])

# the schema is what it needs as an output format
@pandas_udf(custom_schema, PandasUDFType.GROUPED_MAP)
def my_custom_function(pdf):
    # Input/output are both a pandas.DataFrame

    #return a totalaly different DataFrame
    dt = pd.DataFrame({'User': ['Alice', 'Bob'], 'Sport': ['Football', 'Basketball'], 'Age': [27, 34]})
    dt['Age_lag'] = dt['Age'].shift(1).fillna(-1)

    return dt

df.groupby('id').apply(my_custom_function).toPandas() 

结果:

    User    Sport   Age     Age_lag
0   Alice   Football    27  -1
1   Bob     Basketball  34  27
2   Alice   Football    27  -1
3   Bob     Basketball  34  27



什么不起作用?

例 3)

如果我们省略 .fillna(-1)那么我们会收到下一个错误

custom_schema = StructType([
                        StructField('User',StringType(),True),
                        StructField('Sport',StringType(),True),
                        StructField('Age',IntegerType(),True),
                        StructField('Age_lag',IntegerType(),True),
                        ])

# the schema is what it needs as an output format
@pandas_udf(custom_schema, PandasUDFType.GROUPED_MAP)
def my_custom_function(pdf):
    # Input/output are both a pandas.DataFrame

    #return a totalaly different DataFrame
    dt = pd.DataFrame({'User': ['Alice', 'Bob'], 'Sport': ['Football', 'Basketball'], 'Age': [27, 34]})
    dt['Age_lag'] = dt['Age'].shift(1)

    return dt

df.groupby('id').apply(my_custom_function).toPandas() 

结果:pyarrow.lib.ArrowInvalid:浮点值被截断



例 4)

最后但并非最不重要的一点是,如果我们只是将一个静态数据帧发回 age_lag 包含None,那么它也不起作用。

from pyspark.sql.types import StructType,NullType, StructField,FloatType, LongType, DoubleType, StringType, IntegerType
# true means,  accepts nulls
custom_schema = StructType([
                        StructField('User',StringType(),True),
                        StructField('Sport',StringType(),True),
                        StructField('Age',IntegerType(),True),
                        StructField('Age_lag',IntegerType(),True),
                        ])

# the schema is what it needs as an output format
@pandas_udf(custom_schema, PandasUDFType.GROUPED_MAP)
def my_custom_function(pdf):
    # Input/output are both a pandas.DataFrame

    #return a totalaly different DataFrame
    dt = pd.DataFrame({'User': ['Alice', 'Bob'], 
                      'Sport': ['Football', 'Basketball'], 
                        'Age': [27, 34], 
                    'Age_lag': [27, None]})


    return dt

df.groupby('id').apply(my_custom_function).toPandas() 

问题

  • 你如何处理这个问题?
  • 这是一个糟糕的设计吗?
    • (因为我可以想象 1000 个我确实想返回 NaN 和 None 的情况)
  • 我们真的必须填写所有缺失值吗?然后再把它们换回来?还是使用浮点数而不是整数?ETC?
  • 这会在不久的将来得到解决吗?(因为 pandas_udf 很新)
4

0 回答 0