3

我是 pyspark 的新手

我有一个看起来像的数据集(只是几列的快照)

数据描述

我想按键分组我的数据。我的钥匙是

CONCAT(a.div_nbr,a.cust_nbr)

我的最终目标是将数据转换为 JSON,格式如下

k1[{v1,v2,....},{v1,v2,....}], k2[{v1,v2,....},{v1,v2,....}],....

例如

248138339 [{ PRECIMA_ID:SCP 00248 0000138339, PROD_NBR:5553505, PROD_DESC:Shot and a Beer Battered Onion Rings (5553505 and 9285840) , PROD_BRND:Molly's Kitchen,PACK_SIZE:4/2.5 LB, QTY_UOM:CA } , 
        { PRECIMA_ID:SCP 00248 0000138339 , PROD_NBR:6659079 , PROD_DESC:Beef Chuck Short Rib Slices, PROD_BRND:Stockyards , PACK_SIZE:12 LBA , QTY_UOM:CA} ,{...,...,} ],

1384611034793[{},{},{}],....

我创建了一个数据框(我加入两个表基本上是为了获得更多字段)

joinstmt = sqlContext.sql(
          "SELECT a.precima_id , CONCAT(a.div_nbr,a.cust_nbr) as
                  key,a.prod_nbr , a.prod_desc,a.prod_brnd ,      a.pack_size , a.qty_uom , a.sales_opp , a.prc_guidance , a.pim_mrch_ctgry_desc , a.pim_mrch_ctgry_id , b.start_date,b.end_date 

FROM scoop_dtl a join scoop_hdr b on (a.precima_id =b.precima_id)")

现在,为了获得上述结果,我需要根据键对结果进行分组,我做了以下

groupbydf = joinstmt.groupBy("key")

这导致 intp 分组数据,阅读后我知道我不能直接使用它,我需要将它转换回数据帧来存储它。

我是新手,需要一些帮助才能将其转换回数据框,或者如果还有其他方法,我将不胜感激。

4

2 回答 2

5

如果您加入的数据框如下所示:

gender  age
M   5
F   50
M   10
M   10
F   10

然后,您可以使用下面的代码来获得所需的输出

joinedDF.groupBy("gender") \ 
    .agg(collect_list("age").alias("ages")) \
    .write.json("jsonOutput.txt")

输出如下所示:

{"gender":"F","ages":[50,10]}
{"gender":"M","ages":[5,10,10]}

如果您有多个列,例如姓名,薪水。您可以添加如下列:

df.groupBy("gender")
    .agg(collect_list("age").alias("ages"),collect_list("name").alias("names"))

您的输出将如下所示:

{"gender":"F","ages":[50,10],"names":["ankit","abhay"]}
{"gender":"M","ages":[5,10,10],"names":["snchit","mohit","rohit"]}
于 2018-01-12T09:53:47.303 回答
2

不能GroupedData直接使用。它必须先聚合。它可以被内置函数的聚合部分覆盖,collect_list但使用DataFrameWriter.

In 可以尝试这样的事情:

from pyspark.sql import Row
import json

def make_json(kvs):
  k, vs = kvs
  return json.dumps({k[0]: list(vs)})

(df.select(struct(*keys), values)
    .rdd
    .mapValues(Row.asDict)
    .groupByKey()
    .map(make_json))

saveAsTextFile

于 2016-02-27T19:14:05.340 回答