1
  • 火花 2.2.0

我从 SQL 脚本转换了以下代码。它已经运行了两个小时,它仍在运行。甚至比 SQL Server 还要慢。有什么事情做得不对吗?

以下是计划,

  1. table2送给所有执行者
  2. 分区table1并将分区分发给执行者。
  3. 并且每一行table2/t2连接(交叉连接)的每个分区table1

因此交叉连接结果的计算可以分布式/并行运行。(我想,例如假设我有 16 个 executor,在所有 16 个 executor 上保留一份 t2 的副本。然后将表 1 分成 16 个分区,每个 executor 一个。然后每个 executor 对表 1 的一个分区进行计算和 t2。)

case class Cols (Id: Int, F2: String, F3: BigDecimal, F4: Date, F5: String,
                 F6: String, F7: BigDecimal, F8: String, F9: String, F10: String )
case class Result (Id1: Int, ID2: Int, Point: Int)

def getDataFromDB(source: String) = {
    import sqlContext.sparkSession.implicits._

    sqlContext.read.format("jdbc").options(Map(
      "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "url" -> jdbcSqlConn,
      "dbtable" -> s"$source"
    )).load()
      .select("Id", "F2", "F3", "F4", "F5", "F6", "F7", "F8", "F9", "F10")
      .as[Cols]
  }

val sc = new SparkContext(conf)
val table1:DataSet[Cols] = getDataFromDB("table1").repartition(32).cache()
println(table1.count()) // about 300K rows

val table2:DataSet[Cols] = getDataFromDB("table2") // ~20K rows
table2.take(1)
println(table2.count())
val t2 = sc.broadcast(table2)

import org.apache.spark.sql.{functions => func}
val j = table1.joinWith(t2.value, func.lit(true))

j.map(x => {
  val (l, r) = x
  Result(l.Id, r.Id, 
  (if (l.F1!= null && r.F1!= null && l.F1== r.F1) 3 else 0)
  +(if (l.F2!= null && r.F2!= null && l.F2== r.F2) 2 else 0)
  + ..... // All kind of the similiar expression
  +(if (l.F8!= null && r.F8!= null && l.F8== r.F8) 1 else 0)
  )
}).filter(x => x.Value >= 10)
println("Total count %d", j.count()) // This takes forever, the count will be about 100

如何用 Spark 惯用的方式重写它?

参考:https ://forums.databricks.com/questions/6747/how-do-i-get-a-cartesian-product-of-a-huge-dataset.html

4

1 回答 1

2

(不知怎的,我感觉好像我已经看过代码了)

代码很慢,因为您只使用一个任务来使用 JDBC 从数据库中加载整个数据集,尽管cache它并没有从中受益。

首先查看ExecutorsWeb UI 中的物理计划和选项卡,以了解单个执行器和完成工作的单个任务。

您应该使用以下方法之一来微调要加载的任务数量:

  1. 为 JDBC 数据源使用partitionColumn, lowerBound,选项upperBound
  2. 使用predicates选项

请参阅Spark 官方文档中的JDBC To Other Databases

加载完成后,您应该改进最后一个操作并在以下行之后count添加...另一个操作:count

val table1: DataSet[Cols] = getDataFromDB("table1").repartition(32).cache()
// trigger caching as it's lazy in Dataset API
table1.count

整个查询缓慢的原因是您仅table1在执行恰好在末尾(!)的操作时才将其标记为缓存,换句话说,cache没有任何用处,更重要的是使查询性能更差。

性能也会随着你而提高table2.cache.count

如果要进行交叉连接,请使用crossJoin运算符。

crossJoin(right: Dataset[_]): DataFrame与另一个 DataFrame 显式笛卡尔连接。

请注意 scaladoc 的注释crossJoin(没有双关语)。

如果没有可以下推的额外过滤器,笛卡尔连接非常昂贵。

鉴于所有可用的优化,Spark 已经处理了以下要求。

因此交叉连接结果的计算可以分布式/并行运行。

那是 Spark 的工作(同样,不是双关语)。

以下要求要求广播。

例如,我想假设我有 16 个执行者,在所有 16 个执行者上保留一份 t2 副本。然后将表1分成16个分区,每个executor一个。然后每个 executor 对表 1 和 t2 的一个分区进行计算。)

使用广播函数提示 Spark SQL 的引擎在广播模式下使用 table2。

broadcast[T](df: Dataset[T]): Dataset[T]将 DataFrame 标记为足够小以用于广播连接。

于 2017-07-19T07:35:48.067 回答