0

我正在尝试使用 Great Expectations v3 api 创建一个非常简单的期望:expect_column_values_to_be_positive。我正在使用 PandasExecutionEngine,我的数据资产是 pandas 数据框。

my_custom_expectation.py 位于 plugins/ 文件夹中。

这是我在 my_custom_expectation.py 中的代码:

from great_expectations.execution_engine import (
     PandasExecutionEngine,
)
from great_expectations.expectations.metrics import (
    ColumnMapMetricProvider,
    column_condition_partial
)
from great_expectations.expectations.expectation import (
    ColumnMapExpectation,
)


def check_positive(value):
    if value:
        return True if value > 0 else False
    return True


class ColumnValueIsPositive(ColumnMapMetricProvider):
    condition_metric_name = "column_values.to_be_positive"

    @column_condition_partial(engine=PandasExecutionEngine)
    def _pandas(cls, column, **kwargs):
        return column.apply(lambda x: check_positive(x))


class ExpectColumnValuesToBePositive(ColumnMapExpectation):
    map_metric = "column_values.to_be_positive"

然后在我的 jupyter notebook 中,我尝试创建我的期望:

from my_custom_expectation import ExpectColumnValuesToBePositive

validator.expect_column_values_to_be_positive(column="duration")

但是,我收到以下错误:

TypeError                                 Traceback (most recent call last)
/tmp/ipykernel_5957/859745029.py in <module>
----> 1 validator.expect_column_values_to_be_positive(column="duration")

~/.local/share/virtualenvs/ge-YbASoQtb/lib/python3.8/site-packages/great_expectations/validator/validator.py in inst_expectation(*args, **kwargs)
    285 
    286                 else:
--> 287                     raise err
    288             return validation_result
    289 

~/.local/share/virtualenvs/ge-YbASoQtb/lib/python3.8/site-packages/great_expectations/validator/validator.py in inst_expectation(*args, **kwargs)
    240                     )
    241                 else:
--> 242                     validation_result = expectation.validate(
    243                         validator=self,
    244                         evaluation_parameters=self._expectation_suite.evaluation_parameters,

~/.local/share/virtualenvs/ge-YbASoQtb/lib/python3.8/site-packages/great_expectations/expectations/expectation.py in validate(self, validator, configuration, evaluation_parameters, interactive_evaluation, data_context, runtime_configuration)
    631             evaluation_parameters, interactive_evaluation, data_context
    632         )
--> 633         evr = validator.graph_validate(
    634             configurations=[configuration],
    635             runtime_configuration=runtime_configuration,

~/.local/share/virtualenvs/ge-YbASoQtb/lib/python3.8/site-packages/great_expectations/validator/validator.py in graph_validate(self, configurations, metrics, runtime_configuration)
    499                 return evrs
    500             else:
--> 501                 raise err
    502 
    503         for configuration in processed_configurations:

~/.local/share/virtualenvs/ge-YbASoQtb/lib/python3.8/site-packages/great_expectations/validator/validator.py in graph_validate(self, configurations, metrics, runtime_configuration)
    477         # an exception occurring as part of resolving the combined validation graph impacts all expectations in suite.
    478         try:
--> 479             self.resolve_validation_graph(
    480                 graph=graph,
    481                 metrics=metrics,

~/.local/share/virtualenvs/ge-YbASoQtb/lib/python3.8/site-packages/great_expectations/validator/validator.py in resolve_validation_graph(self, graph, metrics, runtime_configuration)
    555 
    556             metrics.update(
--> 557                 self._resolve_metrics(
    558                     execution_engine=self._execution_engine,
    559                     metrics_to_resolve=ready_metrics,

~/.local/share/virtualenvs/ge-YbASoQtb/lib/python3.8/site-packages/great_expectations/validator/validator.py in _resolve_metrics(execution_engine, metrics_to_resolve, metrics, runtime_configuration)
    603         """A means of accessing the Execution Engine's resolve_metrics method, where missing metric configurations are
    604         resolved"""
--> 605         return execution_engine.resolve_metrics(
    606             metrics_to_resolve=metrics_to_resolve,
    607             metrics=metrics,

~/.local/share/virtualenvs/ge-YbASoQtb/lib/python3.8/site-packages/great_expectations/execution_engine/execution_engine.py in resolve_metrics(self, metrics_to_resolve, metrics, runtime_configuration)
    283                 # than data to optimize compute in the future
    284                 try:
--> 285                     resolved_metrics[metric_to_resolve.id] = metric_fn(
    286                         **metric_provider_kwargs
    287                     )

~/.local/share/virtualenvs/ge-YbASoQtb/lib/python3.8/site-packages/great_expectations/expectations/metrics/metric_provider.py in inner_func(*args, **kwargs)
     53         @wraps(metric_fn)
     54         def inner_func(*args, **kwargs):
---> 55             return metric_fn(*args, **kwargs)
     56 
     57         inner_func.metric_engine = engine

~/.local/share/virtualenvs/ge-YbASoQtb/lib/python3.8/site-packages/great_expectations/expectations/metrics/map_metric_provider.py in inner_func(cls, execution_engine, metric_domain_kwargs, metric_value_kwargs, metrics, runtime_configuration)
    326                     df = df[df[column_name].notnull()]
    327 
--> 328                 meets_expectation_series = metric_fn(
    329                     cls,
    330                     df[column_name],

TypeError: _pandas() got an unexpected keyword argument '_metrics'

我错过了什么吗?我按照这个例子来写我的期望/指标。

4

1 回答 1

0

我最终能够通过从这里复制粘贴所有内容来创建自己的期望。所以我的代码最终看起来像这样:

    from typing import Dict
from great_expectations.core.expectation_configuration import parse_result_format
from great_expectations.expectations.expectation import (
    ColumnMapExpectation,
    _format_map_output,
)
from great_expectations.expectations.util import render_evaluation_parameter_string
from great_expectations.render.renderer.renderer import renderer
from great_expectations.render.types import RenderedStringTemplateContent
from great_expectations.render.util import (
    num_to_str,
    parse_row_condition_string_pandas_engine,
    substitute_none_for_missing,
)
from typing import Optional

from great_expectations.core import ExpectationConfiguration
from great_expectations.execution_engine import (
    ExecutionEngine,
    PandasExecutionEngine,
)
from great_expectations.expectations.metrics.map_metric_provider import (
    ColumnMapMetricProvider,
    column_condition_partial,
)
from great_expectations.expectations.metrics.metric_provider import (
    MetricProvider,
    metric_value,
)
from great_expectations.validator.validation_graph import MetricConfiguration

class ColumnValuesPositive(ColumnMapMetricProvider):
    condition_metric_name = "column_values.positive"
    # filter_column_isnull = False

    @column_condition_partial(engine=PandasExecutionEngine)
    def _pandas(cls, column, **kwargs):
        print("calling pandas func in custom expectstion ******")

        def check_positive(value):
            print("calling check_positive.....")
            if value:
                return True if value > 0 else False
            return True

        return column.apply(lambda x: check_positive(x))


class ColumnValuesPositiveCount(MetricProvider):
    """A convenience class to provide an alias for easier access to the null count in a column."""

    metric_name = "column_values.positive.count"

    @metric_value(engine=PandasExecutionEngine)
    def _pandas(*, metrics, **kwargs):
        return metrics["column_values.nonpositive.unexpected_count"]

    @classmethod
    def _get_evaluation_dependencies(
        cls,
        metric: MetricConfiguration,
        configuration: Optional[ExpectationConfiguration] = None,
        execution_engine: Optional[ExecutionEngine] = None,
        runtime_configuration: Optional[dict] = None,
    ):
        dependencies: dict = super()._get_evaluation_dependencies(
            metric=metric,
            configuration=configuration,
            execution_engine=execution_engine,
            runtime_configuration=runtime_configuration,
        )
        dependencies["column_values.nonpositive.unexpected_count"] = MetricConfiguration(
            metric_name="column_values.nonpositive.unexpected_count",
            metric_domain_kwargs=metric.metric_domain_kwargs,
        )
        return dependencies


class ExpectColumnValuesToBePositive(ColumnMapExpectation):
    """Expect column values to be positive.
    expect_column_values_to_be_positive is a \
    :func:`column_map_expectation <great_expectations.execution_engine.execution_engine.MetaExecutionEngine
    .column_map_expectation>`.
    Args:
        column (str): \
            The column name.
    Keyword Args:
        mostly (None or a float between 0 and 1): \
            Return `"success": True` if at least mostly fraction of values match the expectation. \
            For more detail, see :ref:`mostly`.
    Other Parameters:
        result_format (str or None): \
            Which output mode to use: `BOOLEAN_ONLY`, `BASIC`, `COMPLETE`, or `SUMMARY`.
            For more detail, see :ref:`result_format <result_format>`.
        include_config (boolean): \
            If True, then include the expectation config as part of the result object. \
            For more detail, see :ref:`include_config`.
        catch_exceptions (boolean or None): \
            If True, then catch exceptions and include them as part of the result object. \
            For more detail, see :ref:`catch_exceptions`.
        meta (dict or None): \
            A JSON-serializable dictionary (nesting allowed) that will be included in the output without \
            modification. For more detail, see :ref:`meta`.
    Returns:
        An ExpectationSuiteValidationResult
        Exact fields vary depending on the values passed to :ref:`result_format <result_format>` and
        :ref:`include_config`, :ref:`catch_exceptions`, and :ref:`meta`.
    """

    # This dictionary contains metadata for display in the public gallery
    library_metadata = {
        "maturity": "production",
        "package": "great_expectations",
        "tags": ["core expectation", "column map expectation"],
        "contributors": ["@great_expectations"],
        "requirements": [],
    }

    map_metric = "column_values.positive"

    @classmethod
    @renderer(renderer_type="renderer.prescriptive")
    @render_evaluation_parameter_string
    def _prescriptive_renderer(
        cls,
        configuration=None,
        result=None,
        language=None,
        runtime_configuration=None,
        **kwargs
    ):
        runtime_configuration = runtime_configuration or {}
        include_column_name = runtime_configuration.get("include_column_name", True)
        include_column_name = (
            include_column_name if include_column_name is not None else True
        )
        styling = runtime_configuration.get("styling")
        params = substitute_none_for_missing(
            configuration.kwargs,
            ["column", "mostly", "row_condition", "condition_parser"],
        )

        if params["mostly"] is not None:
            params["mostly_pct"] = num_to_str(
                params["mostly"] * 100, precision=15, no_scientific=True
            )
            # params["mostly_pct"] = "{:.14f}".format(params["mostly"]*100).rstrip("0").rstrip(".")
            template_str = "values must be positive, at least $mostly_pct % of the time."
        else:
            template_str = "values must be positive."

        if include_column_name:
            template_str = "$column " + template_str

        if params["row_condition"] is not None:
            (
                conditional_template_str,
                conditional_params,
            ) = parse_row_condition_string_pandas_engine(params["row_condition"])
            template_str = conditional_template_str + ", then " + template_str
            params.update(conditional_params)

        return [
            RenderedStringTemplateContent(
                **{
                    "content_block_type": "string_template",
                    "string_template": {
                        "template": template_str,
                        "params": params,
                        "styling": styling,
                    },
                }
            )
        ]

    @classmethod
    @renderer(renderer_type="renderer.diagnostic.observed_value")
    def _diagnostic_observed_value_renderer(
        cls,
        configuration=None,
        result=None,
        language=None,
        runtime_configuration=None,
        **kwargs
    ):
        result_dict = result.result

        try:
            notpositive_percent = result_dict["unexpected_percent"]
            return (
                num_to_str(100 - notpositive_percent, precision=5, use_locale=True)
                + "% positive"
            )
        except KeyError:
            return "unknown % positive"
        except TypeError:
            return "NaN% positive"

    def get_validation_dependencies(
        self,
        configuration: Optional[ExpectationConfiguration] = None,
        execution_engine: Optional[ExecutionEngine] = None,
        runtime_configuration: Optional[dict] = None,
    ):
        dependencies = super().get_validation_dependencies(
            configuration, execution_engine, runtime_configuration
        )

        return dependencies

    def _validate(
        self,
        configuration: ExpectationConfiguration,
        metrics: Dict,
        runtime_configuration: dict = None,
        execution_engine: ExecutionEngine = None,
    ):
        if runtime_configuration:
            result_format = runtime_configuration.get(
                "result_format",
                configuration.kwargs.get(
                    "result_format", self.default_kwarg_values.get("result_format")
                ),
            )
        else:
            result_format = configuration.kwargs.get(
                "result_format", self.default_kwarg_values.get("result_format")
            )
        mostly = self.get_success_kwargs().get(
            "mostly", self.default_kwarg_values.get("mostly")
        )
        total_count = metrics.get("table.row_count")
        unexpected_count = metrics.get(self.map_metric + ".unexpected_count")

        if total_count is None or total_count == 0:
            # Vacuously true
            success = True
        else:
            success_ratio = (total_count - unexpected_count) / total_count
            success = success_ratio >= mostly

        nonnull_count = None

        return _format_map_output(
            result_format=parse_result_format(result_format),
            success=success,
            element_count=metrics.get("table.row_count"),
            nonnull_count=nonnull_count,
            unexpected_count=metrics.get(self.map_metric + ".unexpected_count"),
            unexpected_list=metrics.get(self.map_metric + ".unexpected_values"),
            unexpected_index_list=metrics.get(
                self.map_metric + ".unexpected_index_list"
            ),
        )
于 2021-09-20T18:51:22.103 回答