0

我想根据 pyspark 数据框的两列中的唯一值对列进行分组。数据框的输出应该是这样的,一旦某个值用于 groupby 并且如果它存在于另一列中,那么它不应该重复。

    |------------------|-------------------|
    |   fruit          |     fruits        | 
    |------------------|-------------------|
    |    apple         |     banana        |
    |    banana        |     apple         |
    |    apple         |     mango         |
    |    orange        |     guava         |
    |    apple         |    pineapple      |
    |    mango         |    apple          |
    |   banana         |     mango         |
    |   banana         |    pineapple      |
    | -------------------------------------|

我尝试使用单列进行分组,需要对其进行修改或需要一些其他逻辑。

df9=final_main.groupBy('fruit').agg(collect_list('fruits').alias('values'))

我从上述查询中得到以下输出;

       |------------------|--------------------------------|
       |   fruit          |     values                     | 
       |------------------|--------------------------------|
       |  apple           | ['banana','mango','pineapple'] |
       |  banana          | ['apple']                      |
       |  orange          | ['guava']                      |
       |  mango           | ['apple']                      |
       |------------------|--------------------------------|

但我想要以下输出;

       |------------------|--------------------------------|
       |   fruit          |     values                     | 
       |------------------|--------------------------------|
       |  apple           | ['banana','mango','pineapple'] |
       |  orange          | ['guava']                      |
       |------------------|--------------------------------|
4

1 回答 1

1

这看起来像一个连接的组件问题。有几种方法可以做到这一点。

1. GraphFrames

您可以使用 GraphFrames 包。数据框的每一行都定义了一条边,您可以创建一个图形,使用df边和所有不同水果的数据框作为顶点。然后调用connectedComponents方法。然后,您可以操纵输出以获得您想要的。

2. 只是 Pyspark

第二种方法有点骇人听闻。为每一行创建一个“哈希”,例如

hashed_df = df.withColumn('hash', F.sort_array(F.array(F.col('fruit'), F.col('fruits'))))

删除该列的所有非不同行

distinct_df = hashed_df.dropDuplicates(['hash'])

再次拆分项目

revert_df = distinct_df.withColumn('fruit', F.col('hash')[0]) \
    .withColumn('fruits', F.col('hash')[1])

按第一列分组

grouped_df = revert_df.groupBy('fruit').agg(F.collect_list('fruits').alias('group'))

如果 Pyspark 抱怨,您可能需要使用“字符串化”您的哈希F.concat_ws,但想法是一样的。

于 2019-08-30T13:03:20.370 回答