7

我试图写一个从 DataFrame 到 DataFrame 的转换方法。我也想通过scalatest来测试它。

如您所知,在带有 Scala API 的 Spark 2.x 中,您可以按如下方式创建 SparkSession 对象:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.bulider
     .config("spark.master", "local[2]")
     .getOrCreate()

此代码适用于单元测试。但是,当我使用 spark-submit 运行此代码时,集群选项不起作用。例如,

spark-submit --master yarn --deploy-mode client --num-executors 10 ...

不创建任何执行者。

我发现当我删除config("master", "local[2]")部分上述代码时应用了 spark-submit 参数。但是,没有大师设置单元测试代码不起作用。

我试图将火花(SparkSession)对象生成部分拆分为测试和主要部分。但是有太多的代码块需要 spark,例如import spark.implicit,_spark.createDataFrame(rdd, schema).

是否有任何最佳实践来编写代码来创建用于测试和运行 spark-submit 的 spark 对象?

4

4 回答 4

5

一种方法是创建一个提供 SparkContext/SparkSession 的特征,并在您的测试用例中使用它,如下所示:

trait SparkTestContext {
  private val master = "local[*]"
  private val appName = "testing"
  System.setProperty("hadoop.home.dir", "c:\\winutils\\")
  private val conf: SparkConf = new SparkConf()
    .setMaster(master)
    .setAppName(appName)
    .set("spark.driver.allowMultipleContexts", "false")
    .set("spark.ui.enabled", "false")

  val ss: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
  val sc: SparkContext = ss.sparkContext
  val sqlContext: SQLContext = ss.sqlContext
}

然后您的测试类标题如下所示:

class TestWithSparkTest extends BaseSpec with SparkTestContext with Matchers{

于 2017-07-31T10:52:39.607 回答
2

我制作了一个版本,Spark 将在测试后正确关闭。

import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}

trait SparkTest extends FunSuite with BeforeAndAfterAll with Matchers {
  var ss: SparkSession = _
  var sc: SparkContext = _
  var sqlContext: SQLContext = _

  override def beforeAll(): Unit = {
    val master = "local[*]"
    val appName = "MyApp"
    val conf: SparkConf = new SparkConf()
      .setMaster(master)
      .setAppName(appName)
      .set("spark.driver.allowMultipleContexts", "false")
      .set("spark.ui.enabled", "false")

    ss = SparkSession.builder().config(conf).getOrCreate()

    sc = ss.sparkContext
    sqlContext = ss.sqlContext
    super.beforeAll()
  }

  override def afterAll(): Unit = {
    sc.stop()
    super.afterAll()
  }
}
于 2018-04-07T15:00:54.663 回答
1

带有参数 --master yarn 的 spark-submit 命令正在设置 yarn master。这将与您的代码 master("x") 冲突,即使使用 like master("yarn")。

如果你想使用 import sparkSession.implicits._ like toDF ,toDS 或其他函数,你可以使用如下创建的本地 sparkSession 变量:

val spark = SparkSession.builder().appName("YourName").getOrCreate()

没有在 spark-submit --master yarn 中设置 master("x") ,而不是在本地机器中。

我建议:不要在代码中使用全局 sparkSession。这可能会导致一些错误或异常。

希望这可以帮助你。祝你好运!

于 2018-08-03T01:32:23.790 回答
0

如何定义一个对象,在该对象中方法创建 SparkSession 的单例实例,例如MySparkSession.get(),并将其作为参数传递给每个单元测试。

在你的 main 方法中,你可以创建一个单独的 SparkSession 实例,它可以有不同的配置。

于 2018-02-12T08:57:17.220 回答