0

我刚开始使用 pydeequ,我想为具有约 1800 个功能的 spark 数据框创建检查。现在要知道我必须执行哪些检查,我执行以下操作

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(df) \
             .addConstraintRule(DEFAULT()) \
             .run()

在上面我得到了关于我可以对我的数据进行的所有检查的建议。现在目标是2折。

  1. 我可能想运行由提供的检查suggestionResult
  2. 我可能想对一系列功能进行特定检查,例如 NonNegative、Unique 检查。

我完全不确定该怎么做,在尝试了几种方法之后,它仍然不起作用,虽然我知道它当然可以一次运行所有建议检查,但只能在 scala 中看到这个(我需要根据我的观点在 pydeequ 中执行此操作1)

我确实尝试过以下方式,但没有奏效。在重复分析仪上给了我一个错误

check_list = [check.isNonNegative,check.isPositive]
checkResultBuilder = VerificationSuite(spark).onData(df)
for col in sub_cols:
    checkResultBuilder = reduce(
    lambda vbuilder,checker: vbuilder.addCheck(checker(col)),check_list,checkResultBuilder)

checkResultBuilder.run()
4

1 回答 1

0

您可以使用此处列出的方法https://github.com/awslabs/python-deequ/issues/23,然后将参数作为名为 args 的列表传递,并将其解压缩为 *args。

Constraint Suggestion Runner 返回一个字典,其中包含 constraint_suggestions 键中的约束,您可以通过在字典中进一步阅读一些工作来进一步解压缩。

使用 eval(str) 将额外参数的字符串形式转换为适当的对象,并使用 get(attr) 添加给定名称的约束。

for item in parameters:
   args.append(eval(str(item)))

check.addConstraint(getattr(check, constraint)(*args))
于 2021-07-12T23:54:29.830 回答