我正在运行一个 spark 应用程序,它从几个配置单元表(IP 地址)中读取数据,并将数据集中的每个元素(IP 地址)与其他数据集中的所有其他元素(IP 地址)进行比较。最终结果将是这样的:
+---------------+--------+---------------+---------------+---------+----------+--------+----------+
| ip_address|dataset1|dataset2 |dataset3 |dataset4 |dataset5 |dataset6| date|
+---------------+--------+---------------+---------------+---------+----------+--------+----------+
| xx.xx.xx.xx.xx| 1 | 1| 0| 0| 0| 0 |2017-11-06|
| xx.xx.xx.xx.xx| 0 | 0| 1| 0| 0| 1 |2017-11-06|
| xx.xx.xx.xx.xx| 1 | 0| 0| 0| 0| 1 |2017-11-06|
| xx.xx.xx.xx.xx| 0 | 0| 1| 0| 0| 1 |2017-11-06|
| xx.xx.xx.xx.xx| 1 | 1| 0| 1| 0| 0 |2017-11-06|
---------------------------------------------------------------------------------------------------
为了进行比较,我将语句的dataframes
结果转换为对象。像这样:hiveContext.sql("query")
Fastutil
val df= hiveContext.sql("query")
val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())
然后,我使用iterator
迭代每个集合并使用FileWriter
.
val dfIterator = dfBuffer.iterator()
while (dfIterator.hasNext){
val p = dfIterator.next().toString
//logic
}
我正在运行应用程序--num-executors 20 --executor-memory 16g --executor-cores 5 --driver-memory 20g
该过程总共运行大约 18-19 小时,每天大约有 4-5 百万条记录进行一对一比较。
但是,当我检查 Application Master UI 时,我注意到在完成初始转换后没有任何活动发生dataframes
(fastutil collection objects
这只需在作业启动后几分钟)。我看到代码中使用的count
andcollect
语句产生了新的工作,直到转换完成。之后,运行比较时不会启动新作业。
这意味着什么?这是否意味着分布式处理根本没有发生?
我知道集合对象不被视为 RDD,
这可能是原因吗?spark如何在不使用分配的资源的情况下执行我的程序?
任何帮助将不胜感激,谢谢!