所以我有一个关于如何用 Python 在 Spark 中编程的问题。请记住,我不是在要求您编写代码,而是在询问如何去做。我很困惑如何在 Spark 中做到这一点。任何帮助或想法将不胜感激。
1) 从数据库中获取所有行并创建 pyspark.sql.DataFrame - 完成
2) 转换所述 DataFrame 中的行- 完成
3) 从 DataFrame 创建一个广播变量只有两列 [ID, GROUPID] -
广播变量的完成原因是分区/聚类。
4) 循环 DataFrame 并搜索广播变量以查看此迭代 GROUPID 是否存在任何 OTHER ID。如果没有找到记录,则 NULL 此迭代 GROUPID
示例:广播变量
+-------+---------+
| ID| GROUPID|
+-------+---------+
| 363345| 95124|
| 363356| 95124|
| 363359| 88896|
| 363361| 50012|<===== only one of this groupid in Broadcast variable
| 375362| 62551|
| 363487| 62551|
| 363489| 88896|
+-------+---------+
需要循环数据帧(300K+ 行)并将广播变量检查为空 GROUPID
+------+---------+-------+----+-------+------------+
| ID|PRODUCTID| ARM|SORT|GROUPID| NAME|
+------+---------+-------+----+-------+------------+
|363345| 523927|5888208| 10| 95124|Enalapril...|
|363356| 523927|5888390| 10| 95124|LISINOPL5...|
|363359| 523927|5888444| 10| 88896|RANTUDEUR...|
|363361| 523927|5888450| 10| 50012|POLYALFA1...|<===== This record GROUPID should be nulled, only one record found in Broadcast variable
|375362| 523927|5888527| 10| 62551|POLAUTFA2...|
|375360| 523927|5894976| 10| null|ENCERACAF...|
|363487| 523927|5905131| 10| 62551|Poly Alco...|
|363488| 523927|5905148| 10| null| Poly...|
|363489| 523927|5905160| 10| 88896|Eapril688...|
|363495| 523927|5909258| 10| null| Eapril77...|
+------+---------+-------+----+-------+------------+
CREATING the DataFrame
df = spark.createDataFrame(
[
(363345, 523927, 5888208, 10, 95124, 'Enalapril...'),
(363356, 523927, 5888390, 10, 95124, 'LISINOPL5...'),
(363359, 523927, 5888444, 10, 88896, 'RANTUDEUR...'),
(363361, 523927, 5888450, 10, 50012, 'POLYALFA1...'),
(375362, 523927, 5888527, 10, 62551, 'POLAUTFA2...'),
(375360, 523927, 5894976, 10, None, 'ENCERACAF...'),
(363487, 523927, 5905131, 10, 62551, 'Poly Alco...'),
(363488, 523927, 5905148, 10, None, 'Poly...'),
(363489, 523927, 5905160, 10, 88896, 'Eapril688...'),
(363495, 523927, 5909258, 10, None, 'Eapril77...')
],
['ID', 'PRODUCTID', 'ARM', 'SORT', 'GROUPID', 'NAME']
)
创建广播变量
ID_GROUPID_Dictionary = {}
for row in df.rdd.collect():
if(row['GROUPID'] != None):
ID_GROUPID_Dictionary[int(row['ID'])] = int(row['GROUPID'])
df_FROMDB_READONLY_BROADCAST = spark.sparkContext.broadcast(ID_GROUPID_Dictionary)