0

我有一个应该执行数据准备步骤的 Spark 应用程序。我编写了一些单元测试来使用 deequ 检查数据质量,并且像往常一样,我想运行我的一个单元测试,但是我遇到了如下错误:

Error while instantiating 'org.apache.spark.sql.internal.SessionStateBuilder':
java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.internal.SessionStateBuilder':
    at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1148)
    at org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:159)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:155)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:152)
    at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:997)
    at org.apache.spark.sql.SparkSession.read(SparkSession.scala:658)
    at com.bigelectrons.housingml.dataprep.HousingDataTest.$anonfun$new$1(HousingDataTest.scala:32)
    at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    at org.scalatest.Transformer.apply(Transformer.scala:22)
    at org.scalatest.Transformer.apply(Transformer.scala:20)
    at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1682)
    at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
    at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
    at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1685)
    at org.scalatest.FlatSpecLike.invokeWithFixture$1(FlatSpecLike.scala:1680)
    at org.scalatest.FlatSpecLike.$anonfun$runTest$1(FlatSpecLike.scala:1692)
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
    at org.scalatest.FlatSpecLike.runTest(FlatSpecLike.scala:1692)
    at org.scalatest.FlatSpecLike.runTest$(FlatSpecLike.scala:1674)
    at org.scalatest.FlatSpec.runTest(FlatSpec.scala:1685)
    at org.scalatest.FlatSpecLike.$anonfun$runTests$1(FlatSpecLike.scala:1750)
    at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
    at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:373)
    at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:410)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
    at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379)
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
    at org.scalatest.FlatSpecLike.runTests(FlatSpecLike.scala:1750)
    at org.scalatest.FlatSpecLike.runTests$(FlatSpecLike.scala:1749)
    at org.scalatest.FlatSpec.runTests(FlatSpec.scala:1685)
    at org.scalatest.Suite.run(Suite.scala:1147)
    at org.scalatest.Suite.run$(Suite.scala:1129)
    at org.scalatest.FlatSpec.org$scalatest$FlatSpecLike$$super$run(FlatSpec.scala:1685)
    at org.scalatest.FlatSpecLike.$anonfun$run$1(FlatSpecLike.scala:1795)
    at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
    at org.scalatest.FlatSpecLike.run(FlatSpecLike.scala:1795)
    at org.scalatest.FlatSpecLike.run$(FlatSpecLike.scala:1793)
    at com.bigelectrons.housingml.dataprep.HousingDataTest.org$scalatest$BeforeAndAfterAll$$super$run(HousingDataTest.scala:20)
    at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
    at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
    at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
    at com.bigelectrons.housingml.dataprep.HousingDataTest.run(HousingDataTest.scala:20)
    at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
    at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1346)
    at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1340)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1340)
    at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1031)
    at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1010)
    at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1506)
    at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
    at org.scalatest.tools.Runner$.run(Runner.scala:850)
    at org.scalatest.tools.Runner.run(Runner.scala)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:38)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:25)
Caused by: java.lang.IllegalStateException: LiveListenerBus is stopped.
    at org.apache.spark.scheduler.LiveListenerBus.addToQueue(LiveListenerBus.scala:97)
    at org.apache.spark.scheduler.LiveListenerBus.addToStatusQueue(LiveListenerBus.scala:80)
    at org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:99)
    at org.apache.spark.sql.SparkSession.$anonfun$sharedState$1(SparkSession.scala:138)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:138)
    at org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:137)
    at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:335)
    at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1145)
    ... 60 more

以下是我访问 Spark 会话的方法:

  val spark: SparkSession = SparkSession.builder().config("spark.master", "local").appName("housing-data-test").getOrCreate()

这是我的实际代码:

"simple unit test" should "check for data correctness" in {
    appCfgT match {
      case Success(appCfg) =>
        preStart()
        val rawDF: DataFrame = spark
          .read
          .format("csv")
          .option("delimiter", ",")
          .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
          .option("inferSchema", value = true)
          .option("mode", "DROPMALFORMED")
          .option("header", value = true)
          .option("multiLine", value = true)
          .schema(encodedHousingSchema)
          .load(appCfg.sourceFileUrl)

        DataTestUtils.withSpark { session =>
          val rows = session.sparkContext.parallelize(Seq(new HousingModel()))
          val data = session.createDataFrame(rows)
          println("******************************************************************************")
          val verificationResult = VerificationSuite()
            .onData(data)
            .addCheck(
              Check(CheckLevel.Error, "unit testing my data")
                .hasSize(_ == 4092) // we expect 4092 rows
                .isComplete("id") // should never be NULL
                .isUnique("id") // should not contain duplicates
                .isComplete("productName") // should never be NULL
                // should only contain the values "high" and "low"
                .isContainedIn("priority", Array("high", "low"))
                .isNonNegative("numViews") // should not contain negative values
                // at least half of the descriptions should contain a url
                .containsURL("description", _ >= 0.5)
                // half of the items should have less than 10 views
                .hasApproxQuantile("numViews", 0.5, _ <= 10)
            )
            .run()
        }

      case Failure(fail) =>
        // TODO: Fail the unit test!
    }
  }
4

0 回答 0