3

我以这种方式从 csv 文件中读取了 Spark Dataframe:

df = ss.read \
     .format("csv") \
     .option("delimiter", ";") \
     .option("header", "false") \
     .option("inferSchema", "true") \
     .option("escape", "\"") \
     .option("multiline", "true") \
     .option("wholeFile", "true") \
     .load(file_path)

数据框是这样的:

|cod_cli|article_name|rank|
|123    |art_1       |1   |
|123    |art_2       |2   |
|123    |art_3       |3   |
|456    |art_4       |1   |
|456    |art_5       |2   |
|456    |art_6       |3   |

我想按列cod_cli对元素进行分组并创建多个列,为分组集中的每个产品创建一个列,并将字典键值作为值,其中键作为列名,值作为与该列相关的值名字,像这样:

|cod_cli|Product 1                  |Product 2                  |Product 3                  |
|123    |{cod_art : art_1, rank : 1}|{cod_art : art_2, rank : 2}|{cod_art : art_3, rank : 3}|
|456    |{cod_art : art_4, rank : 1}|{cod_art : art_5, rank : 2}|{cod_art : art_6, rank : 3}|

字典值可以是字符串(更好)或映射。我试过这样:

df = df \
     .groupBy(F.col("cod_cli")) \
     .agg(F.collect_list(F.array("cod_art","rank")))

但是通过这种方式,我正在创建一个包含所有分组元素的数组列的列。

请问有人可以帮助我吗?

谢谢

更新

提出的解决方案是这样的:

df = df.withColumn(
            "Product",
            F.to_json(
                F.struct(F.col("cod_art"), F.col("rank"))
            )
        )

通过这种方式,我创建了一个带有所需 json 字符串的列“Product”,例如{cod_art : art_1, rank : 1}

然后:

df = df \
     .groupBy(F.col("cod_cli")) \
     .pivot("rank") \
     .agg(F.first("Product"))

这样,我可以为每个产品创建一个列,按cod_cli属性分组,并处理我有超过 3 个产品作为列的情况:

|cod_cli|1                          |2                          |3               
|123    |{cod_art : art_1, rank : 1}|{cod_art : art_2, rank : 2}|{cod_art : art_3, rank : 3}|
|456    |{cod_art : art_4, rank : 1}|{cod_art : art_5, rank : 2}|{cod_art : art_6, rank : 3}|
4

2 回答 2

3

您可以在没有pivot昂贵的操作)的情况下使用collect_listof struct,然后to_json使用create_map.

from pyspark.sql import functions as F

df\
  .groupBy("cod_cli").agg(F.collect_list(F.struct("article_name","rank"))\
                          .alias("arr"))\
  .select("cod_cli", *(F.to_json(F.create_map(F.lit("cod_art"),(F.col("arr.article_name")[x]),F.lit("rank"),(F.col("arr.rank")[x])))\
                       .alias("Product{}".format(x+1)) for x in range(3)))\
  .show(truncate=False)

#+-------+------------------------------+------------------------------+------------------------------+
#|cod_cli|Product1                      |Product2                      |Product3                      |
#+-------+------------------------------+------------------------------+------------------------------+
#|123    |{"cod_art":"art_1","rank":"1"}|{"cod_art":"art_2","rank":"2"}|{"cod_art":"art_3","rank":"3"}|
#|456    |{"cod_art":"art_4","rank":"1"}|{"cod_art":"art_5","rank":"2"}|{"cod_art":"art_6","rank":"3"}|
#+-------+------------------------------+------------------------------+------------------------------+
于 2020-06-15T16:34:35.960 回答
2

也许这很有用-

加载提供的数据

 val data =
      """
        |cod_cli|article_name|rank
        |123    |art_1       |1
        |123    |art_2       |2
        |123    |art_3       |3
        |456    |art_4       |1
        |456    |art_5       |2
        |456    |art_6       |3
      """.stripMargin
    val stringDS = data.split(System.lineSeparator())
            .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS)

    df.show(false)
    df.printSchema()

    /**
      * +-------+------------+----+
      * |cod_cli|article_name|rank|
      * +-------+------------+----+
      * |123    |art_1       |1   |
      * |123    |art_2       |2   |
      * |123    |art_3       |3   |
      * |456    |art_4       |1   |
      * |456    |art_5       |2   |
      * |456    |art_6       |3   |
      * +-------+------------+----+
      *
      * root
      * |-- cod_cli: integer (nullable = true)
      * |-- article_name: string (nullable = true)
      * |-- rank: integer (nullable = true)
      */

pivot使用and创建指定的列first(应该在 pyspark 中以最小的变化实现 all are pyspark.sql.functions

    df.groupBy("cod_cli")
      .pivot("rank")
      .agg(first("article_name"))
      .select($"cod_cli", $"1".as("Product 1"), $"2".as("Product 2"), $"3".as("Product 3"))
      .withColumn("Product 1", to_json(expr("named_struct('cod_art', `Product 1`, 'rank', '1')")))
      .withColumn("Product 2", to_json(expr("named_struct('cod_art', `Product 2`, 'rank', '2')")))
      .withColumn("Product 3", to_json(expr("named_struct('cod_art', `Product 3`, 'rank', '3')")))
      .show(false)

    /**
      * +-------+------------------------------+------------------------------+------------------------------+
      * |cod_cli|Product 1                     |Product 2                     |Product 3                     |
      * +-------+------------------------------+------------------------------+------------------------------+
      * |123    |{"cod_art":"art_1","rank":"1"}|{"cod_art":"art_2","rank":"2"}|{"cod_art":"art_3","rank":"3"}|
      * |456    |{"cod_art":"art_4","rank":"1"}|{"cod_art":"art_5","rank":"2"}|{"cod_art":"art_6","rank":"3"}|
      * +-------+------------------------------+------------------------------+------------------------------+
      */
于 2020-06-15T15:37:47.640 回答