3

在获取外部数据库数据时,我需要弄清楚 spark 是如何在后台工作的。我从 spark 文档中了解到的是,如果我不提及诸如“numPartitons”、“lowerBound”和“upperBound”之类的属性,那么通过 jdbc 读取不是并行的。在这种情况下会发生什么?数据是否由 1 个获取所有数据的特定执行程序读取?那么并行性是如何实现的呢?那个 executor 以后会和其他 executor 共享数据吗?但我相信 executor 不能像这样共享数据。

如果你们中的任何人探索过这个,请告诉我。

编辑我的问题 - 嗨,阿米特,感谢您的回复,但这不是我想要的。让我详细说明一下: - 参考这个 - https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

请参阅下面的代码片段 -

val MultiJoin_vw = db.getDataFromGreenplum_Parallel(ss, MultiJoin, bs,5,"bu_id",10,9000)
println(MultiJoin_vw.explain(true))
println("Number of executors")
ss.sparkContext.statusTracker.getExecutorInfos.foreach(x => println(x.host(),x.numRunningTasks()))
println("Number of partitons:" ,MultiJoin_vw.rdd.getNumPartitions)
println("Number of records in each partiton:")
MultiJoin_vw.groupBy(spark_partition_id).count().show(10)

输出 :

Fetch Starts
== Physical Plan ==
*(1) Scan JDBCRelation((select * from mstrdata_rdl.cmmt_sku_bu_vw)as mytab) [numPartitions=5] [sku_nbr#0,bu_id#1,modfd_dts#2] PushedFilters: [], ReadSchema: struct<sku_nbr:string,bu_id:int,modfd_dts:timestamp>
()
Number of executors
(ddlhdcdev18,0)
(ddlhdcdev41,0)
(Number of partitons:,5)
Number of records in each partition:
+--------------------+------+
|SPARK_PARTITION_ID()| count|
+--------------------+------+
|                   1|212267|
|                   3| 56714|
|                   4|124824|
|                   2|232193|
|                   0|627712|
+--------------------+------+

在这里,我使用自定义函数 db.getDataFromGreenplum_Parallel(ss, MultiJoin, bs,5,"bu_id",10,9000) 读取表,该函数指定基于字段 bu_id 创建 5 个分区,其下限值为 10,上限值为 9000。查看 spark 如何通过 5 个并行连接读取 5 个分区中的数据(如 spark doc 所述)。现在让我们在不提及上述任何参数的情况下阅读此表 -

我只是使用另一个函数获取数据-val MultiJoin_vw = db.getDataFromGreenplum(ss, MultiJoin, bs)

在这里,我只传递了 spark 会话(ss)、获取数据的查询(MultiJoin)和另一个用于异常处理的参数(bs)。o/p 如下所示 – Fetch Starts

== Physical Plan ==
*(1) Scan JDBCRelation((select * from mstrdata_rdl.cmmt_sku_bu_vw)as mytab) [numPartitions=1] [sku_nbr#0,bu_id#1,modfd_dts#2] PushedFilters: [], ReadSchema: struct<sku_nbr:string,bu_id:int,modfd_dts:timestamp>
()
Number of executors
(ddlhdcdev31,0)
(ddlhdcdev27,0)
(Number of partitons:1)
Number of records in each partiton:
+--------------------+-------+
|SPARK_PARTITION_ID()|  count|
+--------------------+-------+
|                   0|1253710|

查看数据如何读入一个分区,意味着只产生 1 个连接。问题仍然是这个分区将只在一台机器上,并且将分配一个任务。所以这里没有并行性。那么数据如何分发给其他执行者呢?

顺便说一下,这是我在两种情况下都使用的 spark-submit 命令——

spark2-submit --master yarn --deploy-mode cluster --driver-memory 1g --num-executors 1 --executor-cores 1 --executor-memory 1g --class jobs.memConnTest $home_directory/target/mem_con_test_v1-jar-with-dependencies.jar
4

1 回答 1

1

回复:“获取数据外部数据库” 在您的 spark 应用程序中,这通常是将在执行程序上执行的代码部分。可以通过传递火花配置“num-executors”来控制执行器的数量。如果您使用过 Spark 和 RDD/Dataframe,那么您将连接到数据库的示例之一是转换函数,例如 map、flatmap、filter 等。这些函数在执行器上执行时(由 num-executors 配置) 将建立数据库连接并使用它。

这里要注意的一件重要的事情是,如果您使用太多的执行程序,那么您的数据库服务器可能会变得越来越慢并且最终没有响应。如果你给的执行者太少,那么它可能会导致你的火花工作需要更多的时间来完成。因此,您必须根据您的数据库服务器容量找到最佳数量。

回复:“那么并行性是如何实现的?那个执行器稍后会与其他执行器共享数据吗?”

上面提到的并行性是通过配置执行器的数量来实现的。配置执行器的数量只是增加并行度的一种方式,并不是唯一的方式。考虑这样一种情况,即您的数据较小,导致分区较少,那么您将看到较少的并行度。所以你需要有足够数量的分区(那些对应于任务),然后适当的(确定的数量取决于用例)执行器数量来并行执行这些任务。只要您可以单独处理每条记录,它就会扩展,但是一旦您有一个会导致随机播放的操作,您就会看到有关任务和执行程序的统计信息。Spark 将尝试以最佳方式分发数据,以便它可以在最佳水平上工作。

请参阅https://blog.cloudera.com/how-to-tune-your-apache-spark-jobs-part-1/及后续部分以了解有关内部结构的更多信息。

于 2020-04-20T15:30:11.723 回答