0

我正在使用 Spark Koalas 探索和分析大型产品数据集。我想做的一件事是对产品编号进行排序,并根据其编号添加具有产品排名的列。这是一个示例数据集:

import databricks.koalas as ks
ks.set_option('compute.default_index_type', 'distributed')
ks.set_option('compute.ops_on_diff_frames', True)


data_df = ks.DataFrame(
  {
    'client': ['client1', 'client1', 'client1', 'client1'],
    'product': ['prod11', 'prod11', 'prod11', 'prod11'],
    'prod_number': ['2.1.0.M1', '2.2.0.M2', '2.10.0.M3', '2.12.0.M4'],
  },
  columns=['client', 'product', 'prod_number']
) 

看起来像这样:

    client  product prod_number
0   client1 prod11  2.1.0.M1
1   client1 prod11  2.2.0.M1
2   client1 prod11  2.10.0.M1
3   client1 prod11  2.12.0.M1

为了对产品编号进行排序和排名,我使用natsort。不幸的是,koalas 数据框 sort_values 方法不接受像 pandas 这样的关键参数,我可以利用它并将natsort_keygen()其作为关键函数传递,所以我正在执行以下操作:

  1. 我正在创建一个包含产品编号及其排名的新数据框
  2. 将这个新数据框与原始数据框连接起来,如下所示:
prod_rank_df = ks.DataFrame(
                   {'prod_number': np.array(order_by_index(data_df['prod_number'].to_numpy(), index_natsorted(data_df['prod_number'].to_numpy(), reverse=True))),
                    'prod_rank': np.arange(1, len(data_df['prod_number'])+1)
                   },
                   columns=['prod_number', 'prod_rank'])
joined_dataframe = data_df.join(prod_rank_df.set_index('prod_number'), on='prod_number')

以及生成的数据框,虽然我不喜欢列 prod_number 移动到第一列并且原始索引丢失,但排名列已成功添加:

            prod_number client    product       prod_rank
8589934592  2.1.0.M1    client1   prod11         4
25769803776 2.2.0.M1    client1   prod11         3
42949672960 2.10.0.M1   client1   prod11         2
60129542144 2.12.0.M1   client1   prod11         1

现在,如果我将此示例扩展到多个客户端和产品,则由于某种原因连接失败。所以,如果我正在处理一个看起来像这样的数据框:

    client  product prod_number
0   client1 prod11  2.0.0.M1
1   client1 prod11  2.0.0.M2
2   client1 prod11  2.0.0.M3
3   client1 prod11  2.0.0.M4
4   client1 prod12  3.0.1.RC
5   client1 prod12  3.0.2.RC
6   client1 prod12  3.0.3.RC
7   client1 prod12  3.0.4.RC
8   client2 prod21  1.4.0
9   client2 prod21  1.4.1
10  client2 prod21  1.4.6
11  client2 prod21  1.4.13
12  client2 prod22  1.3.0.M5
13  client2 prod22  1.3.0.M6
14  client2 prod22  1.3.0.M12
15  client2 prod22  1.3.0.M22

如果我定义一个创建排名并返回连接数据框的函数:

def get_ranked_prods(grouped_dataframe):
  
  prod_rank_df = ks.DataFrame(
                   {'prod_number': np.array(order_by_index(grouped_dataframe['prod_number'].to_numpy(), index_natsorted(grouped_dataframe['prod_number'].to_numpy(), reverse=True))),
                    'prod_rank': np.arange(1, len(grouped_dataframe['prod_number'])+1)
                   },
                   columns=['prod_number', 'prod_rank'])
  joined_dataframe = grouped_dataframe.join(prod_rank_df.set_index('prod_number'), on='prod_number')
  
  return joined_dataframe

我在分组数据帧上调用这个函数如下:

data_df.groupby(by=['client', 'product']).apply(get_ranked_prods)

它失败并出现以下错误:

ValueError: Joining multiple DataFrames only supported for joining on index

我不太确定它为什么会抛出这个错误以及如何解决它。我用熊猫测试了同样的方法,它奏效了。我不确定这是否与丢失原始数据框的索引有关,但我非常感谢您对此提供任何帮助。

4

0 回答 0