0

在 Spark PushdownQuery 中,由数据库的 SQL 引擎处理,并根据它的结果构造数据帧。因此,火花查询该查询的结果。

val pushdownQuery = """(SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah )"""

val dbDataFrame = spark.read.format("jdbc")
.option("url", url).option("dbtable", pushdownQuery).option("driver", driver)
.option("numPartitions", 4)
.option("partitionColumn", "COUNTRY_CODE")
.load()

我可以从 spark - mysql 中的另一个参考(https://dzone.com/articles/how-apache-spark-makes-your-slow-mysql-queries-10x)中看到,下推查询的并行性是通过触发多个基于参数 numPartitions 和 partitionColumn 的查询。这与 sqoop 的分布方式非常相似。比如说上面给出的参数 numPartitions = 4 的例子;partitionColumn = COUNTRY_CODE 并且在我们的表中 COUNTRY_CODE 的值范围落在 (000,999) 上。

构建了 4 个查询;发射到 DB 和数据帧是根据这些结果构建的(在这种情况下并行度为 4)。

Q1 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE >= 000 AND COUNTRY_CODE <= 250
Q2 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE > 250 AND COUNTRY_CODE  <= 500
Q3 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE > 500 AND COUNTRY_CODE  <= 750
Q4 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE > 750 AND COUNTRY_CODE  <= 999

我现在的问题是,如何在 spark (version 2.1) + hbase (Query engine - BIGSQL) 中使用这种方法实现并行性?它现在没有给我并行性。桥接 spark-hbase 的驱动程序是否需要更新?或火花需要这样做?或者什么样的改变有助于它实现这一目标?一些方向对我有帮助。谢谢 !

4

1 回答 1

0

为了获得最佳性能,我建议使用 --num-executors 4 和 --executor-cores 1 开始您的 spark 作业,因为 jdbc 连接是单线程的,每个查询在一个核心上运行一个任务。通过进行此配置更改,当您的作业运行时,您可以观察并行运行的任务,这是每个执行器中的核心正在使用中。

请改用以下函数:

val connectionProperties: Properties = new Properties
connectionProperties.put("user", "xxxx")
connectionProperties.put("password", "xxxx")
connectionProperties.put("fetchsize", "10000") //fetches 10000 records at once per task
connectionProperties.put("driver", "com.mysql.jdbc.Driver")
connectionProperties

val pushdownQuery = """(SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah ) tbl_alias"""

val dbDataFrame = spark.read.jdbc(url, pushdownQuery, "COUNTRY_CODE", 0L, 4L, 4, connectionProperties)

参考https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.sql.DataFrameReader@jdbc(url:String,table:String,columnName:String, lowerBound:Long,upperBound:Long,numPartitions:Int,connectionProperties:java.util.Properties):org.apache.spark.sql.DataFrame

于 2018-08-23T02:47:53.403 回答