我想将存储在pyspark.sql.dataframe.DataFrame
“ ddf
”中的交易按列“ key
”分组,该列表示交易的来源(在本例中为客户 ID)。
分组是一个相当昂贵的过程,所以我想以嵌套模式将组写入磁盘:
(key, [[c1, c2, c3,...], ...])
这将允许我快速加载密钥上的所有事务,并开发复杂的自定义聚合器,而无需重新运行分组。
如何创建嵌套模式并将其写入磁盘?
我想将存储在pyspark.sql.dataframe.DataFrame
“ ddf
”中的交易按列“ key
”分组,该列表示交易的来源(在本例中为客户 ID)。
分组是一个相当昂贵的过程,所以我想以嵌套模式将组写入磁盘:
(key, [[c1, c2, c3,...], ...])
这将允许我快速加载密钥上的所有事务,并开发复杂的自定义聚合器,而无需重新运行分组。
如何创建嵌套模式并将其写入磁盘?
我花了很长时间才弄清楚这一点,尽管答案很简单,所以我想我会在这里发布我的解决方案。
key
首先通过(客户ID)减少所有交易:
from operators import add
# ddf is a dataframe with a transaction in each row. Key is the column
# we want to group the transactions by.
txnrdd = ddf.rdd.map(lambda row: (row['key'], [row],) ).reduceByKey(add)
这给出了一个rdd
看起来像(key, [list of Rows])
. 要将其写回,dataframe
您需要构建模式。事务列表可以由 建模ArrayType
。
from pyspark.sql import types as sqxt
txn_schema = sqxt.StructType([
sqxt.StructField('Key', sqxt.StringType()),
sqxt.StructField('Transactions', sqxt.ArrayType(ddf.schema))
])
然后以这种结构将数据写入磁盘很简单:
txnddf = txnrdd.toDF(schema=txn_schema)
txnddf.write.parquet('customer-transactions.parquet')
性能似乎还可以。在不通过 RDD 的情况下找不到这样做的方法。