我正在使用 Spark Koalas 探索和分析大型产品数据集。我想做的一件事是对产品编号进行排序,并根据其编号添加具有产品排名的列。这是一个示例数据集:
import databricks.koalas as ks
ks.set_option('compute.default_index_type', 'distributed')
ks.set_option('compute.ops_on_diff_frames', True)
data_df = ks.DataFrame(
{
'client': ['client1', 'client1', 'client1', 'client1'],
'product': ['prod11', 'prod11', 'prod11', 'prod11'],
'prod_number': ['2.1.0.M1', '2.2.0.M2', '2.10.0.M3', '2.12.0.M4'],
},
columns=['client', 'product', 'prod_number']
)
看起来像这样:
client product prod_number
0 client1 prod11 2.1.0.M1
1 client1 prod11 2.2.0.M1
2 client1 prod11 2.10.0.M1
3 client1 prod11 2.12.0.M1
为了对产品编号进行排序和排名,我使用natsort。不幸的是,koalas 数据框 sort_values 方法不接受像 pandas 这样的关键参数,我可以利用它并将natsort_keygen()
其作为关键函数传递,所以我正在执行以下操作:
- 我正在创建一个包含产品编号及其排名的新数据框
- 将这个新数据框与原始数据框连接起来,如下所示:
prod_rank_df = ks.DataFrame(
{'prod_number': np.array(order_by_index(data_df['prod_number'].to_numpy(), index_natsorted(data_df['prod_number'].to_numpy(), reverse=True))),
'prod_rank': np.arange(1, len(data_df['prod_number'])+1)
},
columns=['prod_number', 'prod_rank'])
joined_dataframe = data_df.join(prod_rank_df.set_index('prod_number'), on='prod_number')
以及生成的数据框,虽然我不喜欢列 prod_number 移动到第一列并且原始索引丢失,但排名列已成功添加:
prod_number client product prod_rank
8589934592 2.1.0.M1 client1 prod11 4
25769803776 2.2.0.M1 client1 prod11 3
42949672960 2.10.0.M1 client1 prod11 2
60129542144 2.12.0.M1 client1 prod11 1
现在,如果我将此示例扩展到多个客户端和产品,则由于某种原因连接失败。所以,如果我正在处理一个看起来像这样的数据框:
client product prod_number
0 client1 prod11 2.0.0.M1
1 client1 prod11 2.0.0.M2
2 client1 prod11 2.0.0.M3
3 client1 prod11 2.0.0.M4
4 client1 prod12 3.0.1.RC
5 client1 prod12 3.0.2.RC
6 client1 prod12 3.0.3.RC
7 client1 prod12 3.0.4.RC
8 client2 prod21 1.4.0
9 client2 prod21 1.4.1
10 client2 prod21 1.4.6
11 client2 prod21 1.4.13
12 client2 prod22 1.3.0.M5
13 client2 prod22 1.3.0.M6
14 client2 prod22 1.3.0.M12
15 client2 prod22 1.3.0.M22
如果我定义一个创建排名并返回连接数据框的函数:
def get_ranked_prods(grouped_dataframe):
prod_rank_df = ks.DataFrame(
{'prod_number': np.array(order_by_index(grouped_dataframe['prod_number'].to_numpy(), index_natsorted(grouped_dataframe['prod_number'].to_numpy(), reverse=True))),
'prod_rank': np.arange(1, len(grouped_dataframe['prod_number'])+1)
},
columns=['prod_number', 'prod_rank'])
joined_dataframe = grouped_dataframe.join(prod_rank_df.set_index('prod_number'), on='prod_number')
return joined_dataframe
我在分组数据帧上调用这个函数如下:
data_df.groupby(by=['client', 'product']).apply(get_ranked_prods)
它失败并出现以下错误:
ValueError: Joining multiple DataFrames only supported for joining on index
我不太确定它为什么会抛出这个错误以及如何解决它。我用熊猫测试了同样的方法,它奏效了。我不确定这是否与丢失原始数据框的索引有关,但我非常感谢您对此提供任何帮助。