1

我正在查看 Deequ,它看起来是一个非常不错的库。我想知道是否可以从 csv 文件或 HDFS 中的 orc 表加载约束?

假设我有一张包含这些类型的桌子

case class Item(
  id: Long,
  productName: String,
  description: String,
  priority: String,
  numViews: Long
)

我想设置如下约束:

val checks = Check(CheckLevel.Error, "unit testing my data")
                  .isComplete("id") // should never be NULL
                  .isUnique("id") // should not contain duplicates

但我想从 csv 文件加载“.isComplete(“id”)”、“.isUnique(“id”)”,以便业务可以添加约束,我们可以根据他们的输入运行测试


val verificationResult = VerificationSuite()
  .onData(data)
  .addChecks(Seq(checks))
    .run()

我已经设法从SuggestionResult.constraintSuggestion 中获得约束

val allConstraints = suggestionResult.constraintSuggestions
      .flatMap { case (_, suggestions) => suggestions.map { _.constraint }}
      .toSeq

它给出了一个列表,例如:

allConstraints = List(CompletenessConstraint(Completeness(id,None)), ComplianceConstraint(Compliance('id' has no negative values,id >= 0,None))

但它是从SuggestResult.constraintSuggestions 生成的。但我希望能够根据来自 csv 文件的输入创建一个这样的列表,有人可以帮助我吗?

总结一下:基本上我只想添加:

val checks = Check(CheckLevel.Error, "unit testing my data")
.isComplete("columnName1")
.isUnique("columnName1")
.isComplete("columnName2")

动态基于文件所在的文件,例如:

columnName;isUnique;isComplete (header)
columnName1;true;true
columnName2;false;true
4

2 回答 2

1

我选择将 CSV 存储在其中,src/main/resources因为它很容易从那里读取,并且易于与被 QA'ed 的代码并行维护。

def readCSV(spark: SparkSession, filename: String): DataFrame = {
  import spark.implicits._

  val inputFileStream = Try {
    this.getClass.getResourceAsStream("/" + filename)
  }
  .getOrElse(
    throw new Exception("Cannot find" + filename + "in src/main/resources")
  )

  val readlines =
    scala.io.Source.fromInputStream(inputFileStream).getLines.toList

  val csvData: Dataset[String] =
    spark.sparkContext.parallelize(readlines).toDS

  spark.read.option("header", true).option("inferSchema", true).csv(csvData)

}

这会将其加载为 DataFrame;这可以很容易地传递给gavincruickGitHub 上的示例代码,为方便起见,复制到此处:

//code to build verifier from DF that has a 'Constraint' column
type Verifier = DataFrame => VerificationResult

def generateVerifier(df: DataFrame, columnName: String): Try[Verifier] = {

  val constraintCheckCodes: Seq[String] = df.select(columnName).collect().map(_(0).toString).toSeq

  def checkSrcCode(checkCodeMethod: String, id: Int): String = s"""com.amazon.deequ.checks.Check(com.amazon.deequ.checks.CheckLevel.Error, "$id")$checkCodeMethod"""

  val verifierSrcCode = s"""{
                             |import com.amazon.deequ.constraints.ConstrainableDataTypes
                             |import com.amazon.deequ.{VerificationResult, VerificationSuite}
                             |import org.apache.spark.sql.DataFrame
                             |
                             |val checks = Seq(
                             |  ${constraintCheckCodes.zipWithIndex
                           .map { (checkSrcCode _).tupled }
                           .mkString(",\n  ")}
                             |)
                             |
                             |(data: DataFrame) => VerificationSuite().onData(data).addChecks(checks).run()
                             |}
    """.stripMargin.trim

  println(s"Verification function source code:\n$verifierSrcCode\n")

  compile[Verifier](verifierSrcCode)
}

/** Compiles the scala source code that, when evaluated, produces a value of type T. */
def compile[T](source: String): Try[T] =
  Try {
      val toolbox = currentMirror.mkToolBox()
      val tree = toolbox.parse(source)
      val compiledCode = toolbox.compile(tree)
      compiledCode().asInstanceOf[T]
}

//example usage...

//sample test data
val testDataDF = Seq(
      ("2020-02-12", "England", "E10000034", "Worcestershire", 1),
      ("2020-02-12", "Wales", "W11000024", "Powys", 0),
      ("2020-02-12", "Wales", null, "Unknown", 1),
      ("2020-02-12", "Canada", "MADEUP", "Ontario", 1)
  ).toDF("Date", "Country", "AreaCode", "Area", "TotalCases")

//constraints in a DF
val constraintsDF = Seq(
    (".isComplete(\"Area\")"),
    (".isComplete(\"Country\")"),
    (".isComplete(\"TotalCases\")"),
    (".isComplete(\"Date\")"),
    (".hasCompleteness(\"AreaCode\", _ >= 0.80, Some(\"It should be above 0.80!\"))"),
    (".isContainedIn(\"Country\", Array(\"England\", \"Scotland\", \"Wales\", \"Northern Ireland\"))")
  ).toDF("Constraint")

//Build Verifier from constraints DF
val verifier = generateVerifier(constraintsDF, "Constraint").get

//Run verifier against a sample DF 
val result = verifier(testDataDF)

//display results
VerificationResult.checkResultsAsDataFrame(spark, result).show()
于 2021-11-02T00:03:52.583 回答
1

这取决于您希望允许约束的复杂程度。通常,deequ 允许您使用任意 scala 代码来验证约束的功能,因此从文件加载它是困难的(并且从安全角度来看是危险的)。

我认为您必须为 CSV 文件提出自己的架构和语义,至少 deequ 不直接支持它。

于 2019-10-22T11:06:46.563 回答