1
schema = StructType([
    StructField("title", StringType(), False),
    StructField("stringdataA", StringType(), False),
#     StructField("list", ArrayType( StructType([
#         StructField("A", IntegerType()  , False),
#         StructField("B", StringType()   , False),
#         StructField("C", TimestampType(), False)
#     ]))),
    StructField("stringdataB",  StringType(), False)])

    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def make_data(x):
        ~~ make data fitted in shcema

groupedList = df.groupby("groupkey").apply(make_data)

'make_data' 函数将生成适合我定义的模式的数据,但是当我在模式中添加 list(map()) 结构字段时。它给了我一个如下所示的错误。那真的不支持架构结构吗?

有什么方法可以获取我可以处理的 list(map()) 结构数据吗?

NotImplementedError:使用分组映射 Pandas UDF 的 returnType 无效:StructType(List(StructField(title,StringType,false),StructField(stringdataA,StringType,false),StructField(list,ArrayType(StructType(List(StructField(A,IntegerType,false) ,StructField(B,StringType,false),StructField(C,TimestampType,false))),true),true),StructField(stringdataB,StringType,false))) 不支持

4

2 回答 2

1

我认为您的列表元素StructType不受支持:

https://github.com/apache/spark/blob/4a4e7aeca79738d5788628d67d97d704f067e8d7/python/pyspark/sql/types.py#L1581

如果你想确认,试着打电话pyspark.sql.types.to_arrow_schema(schema)看看会发生什么。

于 2019-02-25T12:19:10.277 回答
0

由于不支持 StructType,一种解决方法是在返回数据之前使用 json.dumps(data) 转储数据。此模式将具有 StringType() 字段。

稍后您可以使用 json.loads() 转换为 Array/list

于 2020-03-03T18:17:21.520 回答