12

我正在运行一个 spark 应用程序,它从几个配置单元表(IP 地址)中读取数据,并将数据集中的每个元素(IP 地址)与其他数据集中的所有其他元素(IP 地址)进行比较。最终结果将是这样的:

+---------------+--------+---------------+---------------+---------+----------+--------+----------+
|     ip_address|dataset1|dataset2       |dataset3       |dataset4 |dataset5  |dataset6|      date|
+---------------+--------+---------------+---------------+---------+----------+--------+----------+
| xx.xx.xx.xx.xx|     1  |              1|              0|        0|         0|      0 |2017-11-06|
| xx.xx.xx.xx.xx|     0  |              0|              1|        0|         0|      1 |2017-11-06|
| xx.xx.xx.xx.xx|     1  |              0|              0|        0|         0|      1 |2017-11-06|
| xx.xx.xx.xx.xx|     0  |              0|              1|        0|         0|      1 |2017-11-06|
| xx.xx.xx.xx.xx|     1  |              1|              0|        1|         0|      0 |2017-11-06|
---------------------------------------------------------------------------------------------------

为了进行比较,我将语句的dataframes结果转换为对象。像这样:hiveContext.sql("query")Fastutil

val df= hiveContext.sql("query")
val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())

然后,我使用iterator迭代每个集合并使用FileWriter.

val dfIterator = dfBuffer.iterator()
while (dfIterator.hasNext){
     val p = dfIterator.next().toString
     //logic
}

我正在运行应用程序--num-executors 20 --executor-memory 16g --executor-cores 5 --driver-memory 20g

该过程总共运行大约 18-19 小时,每天大约有 4-5 百万条记录进行一对一比较。

但是,当我检查 Application Master UI 时,我注意到在完成初始转换后没有任何活动发生dataframesfastutil collection objects这只需在作业启动后几分钟)。我看到代码中使用的countandcollect语句产生了新的工作,直到转换完成。之后,运行比较时不会启动新作业。

  • 这意味着什么?这是否意味着分布式处理根本没有发生?

  • 我知道集合对象不被视为 RDD,
    这可能是原因吗?

  • spark如何在不使用分配的资源的情况下执行我的程序?

任何帮助将不胜感激,谢谢!

4

1 回答 1

8

行后:

val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())

尤其是 上一行的那部分:

df.map(r => r(0).toString).collect()

collect是需要注意的最重要的事情,从来没有执行过 Spark 作业dfBuffer(这是一种常规的本地 JVM 数据结构)。

这是否意味着分布式处理根本没有发生?

正确的。collect将所有数据带到驱动程序运行的单个 JVM 上(这正是您不应该这样做的原因,除非......您知道自己在做什么以及它可能导致什么问题)。

我认为以上回答了所有其他问题。


比较两个数据集(以 Spark 和分布式方式)的问题的一个可能解决方案是将join数据集与参考数据集count进行比较,并比较记录数是否没有变化。

于 2018-03-13T20:23:39.423 回答