0

我正在使用 Amazon deequ 生成测试用例,它返回以下我想在进一步功能中使用的方法列表,而不是单独编码。

var rows = suggestionDataFrame.select("_3").collect().map(_.getString(0)).mkString(" ")
// var rows = suggestionDataFrame.select("_3").collect.map { row => row.toString() .mkString("")} 

这些行返回下面的方法列表

.hasCompleteness("Id", _ >= 0.95, Some("It should be above 0.95!")) .isNonNegative("Id") 
.isComplete("LastModifiedDate")

在下一个函数中,我想在下面传递这些值,例如

 val verificationResult: VerificationResult = {
      VerificationSuite()
        .onData(datasource)
        .addCheck(
          Check(CheckLevel.Error, "Data Validation Check")
             //this is how i want to pass
            .hasCompleteness("Id", _ >= 0.95, Some("It should be above 0.95!"))
            .isNonNegative("Id")
            .isComplete("LastModifiedDate"))
          .run()
    }

当我像下面这样直接传递行时,它会抛出错误

 val verificationResult: VerificationResult = {
      VerificationSuite()
        .onData(datasource)
        .addCheck(
          Check(CheckLevel.Error, "Data Validation Check")
           rows).run() //throwing error here
    }

有什么办法吗??

参考:https ://aws.amazon.com/blogs/big-data/test-data-quality-at-scale-with-deequ/

这是我迄今为止尝试过的

package com.myorg.dataquality

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SparkSession
import com.amazon.deequ.suggestions.{ ConstraintSuggestionRunner, Rules }
import com.amazon.deequ.{ VerificationSuite, VerificationResult }
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{ Check, CheckLevel }
import scala.collection.mutable.ArrayBuffer

object DataVerification2 {
  def main(args: Array[String]) {

    val spark = SparkSession.builder.appName("Sample")
      .master("local")
      .getOrCreate()

    val datasource = spark.read.format("jdbc").option("url", "jdbc:sqlserver://host:1433;database=mydb").option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").option("dbtable", "dbo.table").option("user", "myuser").option("password", "password").option("useSSL", "false").load()
    datasource.printSchema()

    val datadestination = spark.read.format("jdbc").option("url", "jdbc:sqlserver://host:1433;database=mydb").option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").option("dbtable", "dbo.table").option("user", "myuser").option("password", "password").option("useSSL", "false").load()
    //datapond.printSchema()

    import spark.implicits._
    //Compute constraint suggestions for us on the data
    val suggestionResult = {
      ConstraintSuggestionRunner()
        // data to suggest constraints for
        .onData(datasource)
        // default set of rules for constraint suggestion
        .addConstraintRules(Rules.DEFAULT)
        // run data profiling and constraint suggestion
        .run()
    }

    // We can now investigate the constraints that Deequ suggested.
    val suggestionDataFrame = suggestionResult.constraintSuggestions.flatMap {
      case (column, suggestions) =>
        suggestions.map { constraint =>
          (column, constraint.description, constraint.codeForConstraint)
        }
    }.toSeq.toDS()

    suggestionDataFrame.toJSON.collect.foreach(println)

    var rows = suggestionDataFrame.select("_3").collect().map(_.getString(0)).mkString(" ")
    // var rows = suggestionDataFrame.select("_3").collect.map { row => row.toString() .mkString("")}
    // var rows = suggestionDataFrame.select("_3").collect().map(t => println(t))
    // var rows = suggestionDataFrame.select("_3").collect.map(_.toSeq)

    var checks = Array[Check]()
    var checkLevel = "Check(CheckLevel.Error)"
    var finalcheck = checkLevel.concat(rows)

    checks :+ finalcheck

    // I am expecting validation result but this is returning me empty result
    val verificationResult: VerificationResult = {
      VerificationSuite().onData(datadestination).addChecks(checks).run()

    }

    val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
    resultDataFrame.show()
    resultDataFrame.filter(resultDataFrame("constraint_status") === "Failure").toJSON.collect.foreach(println)

  }
}

这将返回一个空结果:

+-----+-----------+------------+----------+-----------------+------------------+
|check|check_level|check_status|constraint|constraint_status|constraint_message|
+-----+-----------+------------+----------+-----------------+------------------+
+-----+-----------+------------+----------+-----------------+------------------+

看起来我缺少在数组中添加元素或以错误的方式实现它并为此寻找一些建议。

更新1:

我试过使用下面的代码,但是它抛出错误:

val constraints = suggestionResult.constraintSuggestions.flatMap {
      case (column, suggestions) =>
        suggestions.map { constraint =>
          (constraint.codeForConstraint)
        }
    }

   val generatedCheck  =  Check(CheckLevel.Warning, "generated constraints", constraints)
    val verificationResult = VerificationSuite()
      .onData(datadestination)
      .addChecks(generatedCheck)
      .run()

错误:

type mismatch; found : scala.collection.immutable.Iterable[String] required: Seq[com.amazon.deequ.constraints.Constraint]

更新 2:

    var rows = suggestionDataFrame.select("_3").collect.map(_.toSeq)
    var checks: Seq[Check] = Seq()
    checks :+ rows

   val generatedCheck  =  Check(CheckLevel.Warning, "generated constraints", checks)
   val verificationResult = VerificationSuite()
      .onData(datadestination)
      .addChecks(generatedCheck)
      .run()

错误:

type mismatch; found : Seq[com.amazon.deequ.checks.Check] required: Seq[com.amazon.deequ.constraints.Constraint]

4

1 回答 1

1

如果我正确理解了您的问题,那么您希望将建议的约束添加到您的验证运行中。这是 deequ 中的代码片段的链接,它执行类似的操作:

https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/suggestions/ConstraintSuggestionRunner.scala#L294

我希望这可以作为您如何进行的模板。您需要从约束建议(而不是数据框)中收集约束并基于它们创建检查。

更新1:

我们实际上提供了带有建议结果的约束方法,如果您将上面的行替换为如下,您的代码应该可以工作:

 val allConstraints = suggestionResult.constraintSuggestions
      .flatMap { case (_, suggestions) => suggestions.map { _.constraint }}
      .toSeq

    val generatedCheck = Check(CheckLevel.Error, "generated constraints", allConstraints)

    val verificationResult = VerificationSuite()
      .onData(datasource)
      .addChecks(Seq(generatedCheck))
      .run()
于 2019-09-05T14:04:41.030 回答