0

所以我有一个关于如何用 Python 在 Spark 中编程的问题。请记住,我不是在要求您编写代码,而是在询问如何去做。我很困惑如何在 Spark 中做到这一点。任何帮助或想法将不胜感激。

1) 从数据库中获取所有行并创建 pyspark.sql.DataFrame - 完成
2) 转换所述 DataFrame 中的行- 完成
3) 从 DataFrame 创建一个广播变量只有两列 [ID, GROUPID] -
广播变量的完成原因是分区/聚类。
4) 循环 DataFrame 并搜索广播变量以查看此迭代 GROUPID 是否存在任何 OTHER ID。如果没有找到记录,则 NULL 此迭代 GROUPID

示例:广播变量

+-------+---------+
|     ID|  GROUPID|
+-------+---------+
| 363345|    95124|
| 363356|    95124|
| 363359|    88896|
| 363361|    50012|<===== only one of this groupid in Broadcast variable
| 375362|    62551|
| 363487|    62551|
| 363489|    88896|
+-------+---------+


需要循环数据帧(300K+ 行)并将广播变量检查为空 GROUPID

+------+---------+-------+----+-------+------------+
|    ID|PRODUCTID|    ARM|SORT|GROUPID|        NAME|
+------+---------+-------+----+-------+------------+
|363345|   523927|5888208|  10|  95124|Enalapril...|
|363356|   523927|5888390|  10|  95124|LISINOPL5...|
|363359|   523927|5888444|  10|  88896|RANTUDEUR...|
|363361|   523927|5888450|  10|  50012|POLYALFA1...|<===== This record GROUPID should be nulled, only one record found in Broadcast variable
|375362|   523927|5888527|  10|  62551|POLAUTFA2...|
|375360|   523927|5894976|  10|   null|ENCERACAF...|
|363487|   523927|5905131|  10|  62551|Poly Alco...|
|363488|   523927|5905148|  10|   null|     Poly...|
|363489|   523927|5905160|  10|  88896|Eapril688...|
|363495|   523927|5909258|  10|   null| Eapril77...|
+------+---------+-------+----+-------+------------+


CREATING the DataFrame
    df = spark.createDataFrame(
        [
            (363345, 523927, 5888208, 10, 95124, 'Enalapril...'), 
            (363356, 523927, 5888390, 10, 95124, 'LISINOPL5...'), 
            (363359, 523927, 5888444, 10, 88896, 'RANTUDEUR...'), 
            (363361, 523927, 5888450, 10, 50012, 'POLYALFA1...'), 
            (375362, 523927, 5888527, 10, 62551, 'POLAUTFA2...'), 
            (375360, 523927, 5894976, 10,  None, 'ENCERACAF...'), 
            (363487, 523927, 5905131, 10, 62551, 'Poly Alco...'), 
            (363488, 523927, 5905148, 10,  None, 'Poly...'), 
            (363489, 523927, 5905160, 10, 88896, 'Eapril688...'), 
            (363495, 523927, 5909258, 10,  None, 'Eapril77...')
        ],
        ['ID', 'PRODUCTID', 'ARM', 'SORT', 'GROUPID', 'NAME']
    )

创建广播变量

ID_GROUPID_Dictionary = {}    
for row in df.rdd.collect():
    if(row['GROUPID'] != None):
        ID_GROUPID_Dictionary[int(row['ID'])] = int(row['GROUPID']) 
df_FROMDB_READONLY_BROADCAST = spark.sparkContext.broadcast(ID_GROUPID_Dictionary) 
4

2 回答 2

0

为此使用广播似乎是最重要的,这里还有很多其他选项可以使用,一个示例可以是通过 groupid 聚合您的数据(一个数据集函数),然后运行平面图检查大小是否大于 2 。如果小于 2,则将值更改为 null,如果不是,则按原样返回所有值。您可以在 sql 和 dataframe 或 RDD 中执行此操作。取决于你,但你的代码会更干净。如有任何问题,请随时评论我的答案。

于 2018-07-11T16:28:58.803 回答
0

顺便说一句,这就是我解决这个问题的方式。@Ilya 感谢您的注意。如果有人看到更好的方法,请告诉我。

GROUPID_countList = df.groupBy(df.GROUPID).count().collect()                   
replaceGROUPIDUDF = udf(lambda x: None if x is None else GROUPIDCount(x), IntegerType())      
def GROUPIDCount(grpid):
    for x in GROUPID_countList:            
        if(x["GROUPID"] == grpid): #DEBUG print("{} - {} : {} ".format(x["GROUPID"], grpid, x["count"] ))
            return x["count"]             
    return 0

df = df.withColumn('GROUPID_null', (when(replaceGROUPIDUDF(df['GROUPID']) < 2, lit(None).cast("string"))
                                        .otherwise(df['GROUPID']))) \
                            .drop('GROUPID') \
                            .withColumnRenamed('GROUPID_null', 'GROUPID') 
于 2018-08-21T14:10:33.127 回答