在获取外部数据库数据时,我需要弄清楚 spark 是如何在后台工作的。我从 spark 文档中了解到的是,如果我不提及诸如“numPartitons”、“lowerBound”和“upperBound”之类的属性,那么通过 jdbc 读取不是并行的。在这种情况下会发生什么?数据是否由 1 个获取所有数据的特定执行程序读取?那么并行性是如何实现的呢?那个 executor 以后会和其他 executor 共享数据吗?但我相信 executor 不能像这样共享数据。
如果你们中的任何人探索过这个,请告诉我。
编辑我的问题 - 嗨,阿米特,感谢您的回复,但这不是我想要的。让我详细说明一下: - 参考这个 - https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
请参阅下面的代码片段 -
val MultiJoin_vw = db.getDataFromGreenplum_Parallel(ss, MultiJoin, bs,5,"bu_id",10,9000)
println(MultiJoin_vw.explain(true))
println("Number of executors")
ss.sparkContext.statusTracker.getExecutorInfos.foreach(x => println(x.host(),x.numRunningTasks()))
println("Number of partitons:" ,MultiJoin_vw.rdd.getNumPartitions)
println("Number of records in each partiton:")
MultiJoin_vw.groupBy(spark_partition_id).count().show(10)
输出 :
Fetch Starts
== Physical Plan ==
*(1) Scan JDBCRelation((select * from mstrdata_rdl.cmmt_sku_bu_vw)as mytab) [numPartitions=5] [sku_nbr#0,bu_id#1,modfd_dts#2] PushedFilters: [], ReadSchema: struct<sku_nbr:string,bu_id:int,modfd_dts:timestamp>
()
Number of executors
(ddlhdcdev18,0)
(ddlhdcdev41,0)
(Number of partitons:,5)
Number of records in each partition:
+--------------------+------+
|SPARK_PARTITION_ID()| count|
+--------------------+------+
| 1|212267|
| 3| 56714|
| 4|124824|
| 2|232193|
| 0|627712|
+--------------------+------+
在这里,我使用自定义函数 db.getDataFromGreenplum_Parallel(ss, MultiJoin, bs,5,"bu_id",10,9000) 读取表,该函数指定基于字段 bu_id 创建 5 个分区,其下限值为 10,上限值为 9000。查看 spark 如何通过 5 个并行连接读取 5 个分区中的数据(如 spark doc 所述)。现在让我们在不提及上述任何参数的情况下阅读此表 -
我只是使用另一个函数获取数据-val MultiJoin_vw = db.getDataFromGreenplum(ss, MultiJoin, bs)
在这里,我只传递了 spark 会话(ss)、获取数据的查询(MultiJoin)和另一个用于异常处理的参数(bs)。o/p 如下所示 – Fetch Starts
== Physical Plan ==
*(1) Scan JDBCRelation((select * from mstrdata_rdl.cmmt_sku_bu_vw)as mytab) [numPartitions=1] [sku_nbr#0,bu_id#1,modfd_dts#2] PushedFilters: [], ReadSchema: struct<sku_nbr:string,bu_id:int,modfd_dts:timestamp>
()
Number of executors
(ddlhdcdev31,0)
(ddlhdcdev27,0)
(Number of partitons:1)
Number of records in each partiton:
+--------------------+-------+
|SPARK_PARTITION_ID()| count|
+--------------------+-------+
| 0|1253710|
查看数据如何读入一个分区,意味着只产生 1 个连接。问题仍然是这个分区将只在一台机器上,并且将分配一个任务。所以这里没有并行性。那么数据如何分发给其他执行者呢?
顺便说一下,这是我在两种情况下都使用的 spark-submit 命令——
spark2-submit --master yarn --deploy-mode cluster --driver-memory 1g --num-executors 1 --executor-cores 1 --executor-memory 1g --class jobs.memConnTest $home_directory/target/mem_con_test_v1-jar-with-dependencies.jar