- 火花 2.2.0
我从 SQL 脚本转换了以下代码。它已经运行了两个小时,它仍在运行。甚至比 SQL Server 还要慢。有什么事情做得不对吗?
以下是计划,
- 推
table2
送给所有执行者 - 分区
table1
并将分区分发给执行者。 - 并且每一行
table2/t2
连接(交叉连接)的每个分区table1
。
因此交叉连接结果的计算可以分布式/并行运行。(我想,例如假设我有 16 个 executor,在所有 16 个 executor 上保留一份 t2 的副本。然后将表 1 分成 16 个分区,每个 executor 一个。然后每个 executor 对表 1 的一个分区进行计算和 t2。)
case class Cols (Id: Int, F2: String, F3: BigDecimal, F4: Date, F5: String,
F6: String, F7: BigDecimal, F8: String, F9: String, F10: String )
case class Result (Id1: Int, ID2: Int, Point: Int)
def getDataFromDB(source: String) = {
import sqlContext.sparkSession.implicits._
sqlContext.read.format("jdbc").options(Map(
"driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
"url" -> jdbcSqlConn,
"dbtable" -> s"$source"
)).load()
.select("Id", "F2", "F3", "F4", "F5", "F6", "F7", "F8", "F9", "F10")
.as[Cols]
}
val sc = new SparkContext(conf)
val table1:DataSet[Cols] = getDataFromDB("table1").repartition(32).cache()
println(table1.count()) // about 300K rows
val table2:DataSet[Cols] = getDataFromDB("table2") // ~20K rows
table2.take(1)
println(table2.count())
val t2 = sc.broadcast(table2)
import org.apache.spark.sql.{functions => func}
val j = table1.joinWith(t2.value, func.lit(true))
j.map(x => {
val (l, r) = x
Result(l.Id, r.Id,
(if (l.F1!= null && r.F1!= null && l.F1== r.F1) 3 else 0)
+(if (l.F2!= null && r.F2!= null && l.F2== r.F2) 2 else 0)
+ ..... // All kind of the similiar expression
+(if (l.F8!= null && r.F8!= null && l.F8== r.F8) 1 else 0)
)
}).filter(x => x.Value >= 10)
println("Total count %d", j.count()) // This takes forever, the count will be about 100
如何用 Spark 惯用的方式重写它?