1

我需要运行数据质量测试,所以我为此使用 Amazon Deequ。我可以使用下面的代码找到数据质量成功/失败状态,但接下来我想获取所有检查失败的行并将其存储到另一个 DataFrame/Hive 表中。请帮助我如何获得它。我们还可以同时在多个数据集上执行 Amazon Deequ 吗?下面是运行代码,需要帮助来获取存储错误失败记录的代码。

import com.amazon.deequ.VerificationSuite
import com.amazon.deequ.checks.{Check, CheckLevel, CheckStatus}
import com.amazon.deequ.constraints.ConstraintStatus

object Test extends App {

val spark = SparkSession.builder()
      .master("local[*]")
      .appName("amazon-deequ-test")
      .getOrCreate();

 val data = Seq((1, "Thingy A", "awesome thing.", "high", 0),
      (2, "Thingy B", "available at http://thingb.com", null, 0),
      (3, null, null, "low", 5),
      (4, "Thingy D", "checkout https://thingd.ca", "low", -10),
      (5, "Thingy E", null, "high", 12))

val cols = Seq("id", "productName", "description", "priority", "numViews")
val data = spark.createDataframe(data).toDF(cols: _*)
data.show(false)

val verificationResult: verificationResult = VerificationSuite() {
VerificationSuite()
      .onData(data)
      .addCheck(
        Check(CheckLevel.Error, "integrity checks")
          // we expect 5 records
          .hasSize(_ == 5)
          // 'id' should never be NULL
          .isComplete("id")
          // 'id' should not contain duplicates
          .isUnique("id")
          // 'productName' should never be NULL
          .isComplete("productName")
          // 'priority' should only contain the values "high" and "low"
          .isContainedIn("priority", Array("high", "low"))
          // 'numViews' should not contain negative values
          .isNonNegative("numViews"))
      .addCheck(
        Check(CheckLevel.Warning, "distribution checks")
          // at least half of the 'description's should contain a url
          .containsURL("description", _ >= 0.5)
          // half of the items should have less than 10 'numViews'
          .hasApproxQuantile("numViews", 0.5, _ <= 10))
      .run()

}

val resultDataFrame = checkResultAsDataFrame(spark, verificationResult).show(false)

}
4

0 回答 0