我需要运行数据质量测试,所以我为此使用 Amazon Deequ。我可以使用下面的代码找到数据质量成功/失败状态,但接下来我想获取所有检查失败的行并将其存储到另一个 DataFrame/Hive 表中。请帮助我如何获得它。我们还可以同时在多个数据集上执行 Amazon Deequ 吗?下面是运行代码,需要帮助来获取存储错误失败记录的代码。
import com.amazon.deequ.VerificationSuite
import com.amazon.deequ.checks.{Check, CheckLevel, CheckStatus}
import com.amazon.deequ.constraints.ConstraintStatus
object Test extends App {
val spark = SparkSession.builder()
.master("local[*]")
.appName("amazon-deequ-test")
.getOrCreate();
val data = Seq((1, "Thingy A", "awesome thing.", "high", 0),
(2, "Thingy B", "available at http://thingb.com", null, 0),
(3, null, null, "low", 5),
(4, "Thingy D", "checkout https://thingd.ca", "low", -10),
(5, "Thingy E", null, "high", 12))
val cols = Seq("id", "productName", "description", "priority", "numViews")
val data = spark.createDataframe(data).toDF(cols: _*)
data.show(false)
val verificationResult: verificationResult = VerificationSuite() {
VerificationSuite()
.onData(data)
.addCheck(
Check(CheckLevel.Error, "integrity checks")
// we expect 5 records
.hasSize(_ == 5)
// 'id' should never be NULL
.isComplete("id")
// 'id' should not contain duplicates
.isUnique("id")
// 'productName' should never be NULL
.isComplete("productName")
// 'priority' should only contain the values "high" and "low"
.isContainedIn("priority", Array("high", "low"))
// 'numViews' should not contain negative values
.isNonNegative("numViews"))
.addCheck(
Check(CheckLevel.Warning, "distribution checks")
// at least half of the 'description's should contain a url
.containsURL("description", _ >= 0.5)
// half of the items should have less than 10 'numViews'
.hasApproxQuantile("numViews", 0.5, _ <= 10))
.run()
}
val resultDataFrame = checkResultAsDataFrame(spark, verificationResult).show(false)
}