最好的
目前我正在尝试使用 pyspark pandas_udf,但不幸的是,当我返回包含以下内容的 DataFrame 时遇到了一些问题:NA、None 或 NaNs。如果我使用的是 FloatType,那么结果是好的,但是一旦我使用 IntegerType、TimestampType 等......我收到一个错误并且它不再工作了。
以下是一些有效和无效的示例:
What does work?
示例 1)
custom_schema = StructType([
StructField('User',StringType(),True),
StructField('Sport',StringType(),True),
StructField('Age',IntegerType(),True),
StructField('Age_lag',FloatType(),True),
])
# the schema is what it needs as an output format
@pandas_udf(custom_schema, PandasUDFType.GROUPED_MAP)
def my_custom_function(pdf):
# Input/output are both a pandas.DataFrame
#return a totalaly different DataFrame
dt = pd.DataFrame({'User': ['Alice', 'Bob'], 'Sport': ['Football', 'Basketball'], 'Age': [27, 34]})
dt['Age_lag'] = dt['Age'].shift(1)
return dt
df.groupby('id').apply(my_custom_function).toPandas()
结果:
User Sport Age Age_lag
0 Alice Football 27 NaN
1 Bob Basketball 34 27.0
2 Alice Football 27 NaN
3 Bob Basketball 34 27.0
例 2)
如果我们将 Age_lag 的类型更改为IntegerType ()并用 -1 填充 Na,那么我们仍然有一个有效的结果(没有 NaN)
custom_schema = StructType([
StructField('User',StringType(),True),
StructField('Sport',StringType(),True),
StructField('Age',IntegerType(),True),
StructField('Age_lag',IntegerType(),True),
])
# the schema is what it needs as an output format
@pandas_udf(custom_schema, PandasUDFType.GROUPED_MAP)
def my_custom_function(pdf):
# Input/output are both a pandas.DataFrame
#return a totalaly different DataFrame
dt = pd.DataFrame({'User': ['Alice', 'Bob'], 'Sport': ['Football', 'Basketball'], 'Age': [27, 34]})
dt['Age_lag'] = dt['Age'].shift(1).fillna(-1)
return dt
df.groupby('id').apply(my_custom_function).toPandas()
结果:
User Sport Age Age_lag
0 Alice Football 27 -1
1 Bob Basketball 34 27
2 Alice Football 27 -1
3 Bob Basketball 34 27
什么不起作用?
例 3)
如果我们省略 .fillna(-1)那么我们会收到下一个错误
custom_schema = StructType([
StructField('User',StringType(),True),
StructField('Sport',StringType(),True),
StructField('Age',IntegerType(),True),
StructField('Age_lag',IntegerType(),True),
])
# the schema is what it needs as an output format
@pandas_udf(custom_schema, PandasUDFType.GROUPED_MAP)
def my_custom_function(pdf):
# Input/output are both a pandas.DataFrame
#return a totalaly different DataFrame
dt = pd.DataFrame({'User': ['Alice', 'Bob'], 'Sport': ['Football', 'Basketball'], 'Age': [27, 34]})
dt['Age_lag'] = dt['Age'].shift(1)
return dt
df.groupby('id').apply(my_custom_function).toPandas()
结果:pyarrow.lib.ArrowInvalid:浮点值被截断
例 4)
最后但并非最不重要的一点是,如果我们只是将一个静态数据帧发回 age_lag 包含None,那么它也不起作用。
from pyspark.sql.types import StructType,NullType, StructField,FloatType, LongType, DoubleType, StringType, IntegerType
# true means, accepts nulls
custom_schema = StructType([
StructField('User',StringType(),True),
StructField('Sport',StringType(),True),
StructField('Age',IntegerType(),True),
StructField('Age_lag',IntegerType(),True),
])
# the schema is what it needs as an output format
@pandas_udf(custom_schema, PandasUDFType.GROUPED_MAP)
def my_custom_function(pdf):
# Input/output are both a pandas.DataFrame
#return a totalaly different DataFrame
dt = pd.DataFrame({'User': ['Alice', 'Bob'],
'Sport': ['Football', 'Basketball'],
'Age': [27, 34],
'Age_lag': [27, None]})
return dt
df.groupby('id').apply(my_custom_function).toPandas()
问题:
- 你如何处理这个问题?
- 这是一个糟糕的设计吗?
- (因为我可以想象 1000 个我确实想返回 NaN 和 None 的情况)
- 我们真的必须填写所有缺失值吗?然后再把它们换回来?还是使用浮点数而不是整数?ETC?
- 这会在不久的将来得到解决吗?(因为 pandas_udf 很新)