2

摘要:Dagit 与 PyTest 的 Dagster 运行配置似乎与我的项目不兼容

我一直在尝试在管道上运行 pytest 时遇到错误,我非常感谢任何指针。我一直收到以下形式的错误:

dagster.core.errors.DagsterInvalidConfigError: 
Error in config for pipeline ephemeral_write_myfunc_to_redis_solid_pipeline
Error 1: Undefined field "myfunc_df_to_list" at path root:solids. 
Expected: "{ myfunc_list?: { outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] } 
write_myfunc_to_redis?:..."

关于项目的几点说明:

  • 匕首,版本 0.9.15
  • 我的管道在 Dagit 中运行,对于相同的配置没有错误
  • 单元测试针对构成管道的各个实体运行

失败的解决方案:我尝试使用实体填充配置文件,这些实体定义了每个 pytest 错误所建议的输出,但它们都导致错误比之前的错误更不透明。

我的固体是:

@solid(required_resource_keys={"db"})
def get_myfunc_df(context, query: String) -> myfuncDF:
    do something
    return myfuncDF

@solid
def myfunc_df_to_list(context, df: myfuncDF) -> List:
    do something
    return List

@solid(required_resource_keys={"redis"})
def write_myfunc_to_redis(context, myfunc_list:List) -> None:
    write to redis return None

我的管道是这些固体的链

@pipeline(
    mode_defs=filter_modes(MODES),
    preset_defs=filter_presets(PRESETS),
    tags={"type": "myproject"},
)
def myfunc_to_redis_pipeline():
    df = get_myfunc_df()
    myfunc_list = myfunc_df_to_list(df)
    write_myfunc_to_redis(myfunc_list)

我在 test_main.py 中的测试代码是

    @pytest.mark.myfunc
    def test_myfunc_to_redis_pipeline(self):
        res = execute_pipeline(myfunc_to_redis_pipeline,
                               preset="test",)
        assert res.success
        assert len(res.solid_result_list) == 4
        for solid_res in res.solid_result_list:
            assert solid_res.success

其中预设的“测试”是使用 yaml 文件中的运行配置定义的:

resources:
  db:
    config:
      file_path: test.csv

^ 这是它抛出最多错误的地方,我一直在迭代不同的固体排列以添加 ala:

solids:
  get_myfunc_df:
    inputs:
      query:
        value: select 1

但它还没有解决问题。尽管在 Dagit 中运行时,只有输入实体需要定义,但是否有任何理由需要定义用于测试的实体的输出?

这个错误是否表明有其他问题?

编辑:这是来自 tox --verbose 的堆栈跟踪

self = <repos.myfunc.myfunc.dagster.tests.test_main.Test_myfunc testMethod=test_myfunc_df>

    @pytest.mark.myfunc
    def test_myfunc_df(self):
        """myfunc"""
        result = execute_solid(
            get_myfunc_df,
            mode_def=test_mode,
            run_config=run_config,
>           input_values={"query": "SELECT 1"},
        )

repos/myfunc/myfunc/dagster/tests/test_main.py:29:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/utils/test/__init__.py:324: in execute_solid
    raise_on_error=raise_on_error,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:335: in execute_pipeline
    raise_on_error=raise_on_error,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/telemetry.py:90: in wrap
    result = f(*args, **kwargs)
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:375: in _logged_execute_pipeline
    tags=tags,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/instance/__init__.py:586: in create_run_for_pipeline
    pipeline_def, run_config=run_config, mode=mode,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:644: in create_execution_plan
    environment_config = EnvironmentConfig.build(pipeline_def, run_config, mode=mode)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

pipeline_def = <dagster.core.definitions.pipeline.PipelineDefinition object at 0x1359f6210>
run_config = {'resources': {'ge_data_context': {'config': {'ge_root_dir': '/Users/this_user/Workspace/drizly-dagster/repos/datas...cause_you_bought/dagster/tests/test.csv'}}}, 'solids': {'get_myfunc_df': {'inputs': {'query': {'value': 'select 1'}}}}}
mode = 'test'

    @staticmethod
    def build(pipeline_def, run_config=None, mode=None):
        """This method validates a given run config against the pipeline config schema. If
        successful, we instantiate an EnvironmentConfig object.

        In case the run_config is invalid, this method raises a DagsterInvalidConfigError
        """
        from dagster.config.validate import process_config
        from dagster.core.definitions.executor import ExecutorDefinition
        from dagster.core.definitions.intermediate_storage import IntermediateStorageDefinition
        from dagster.core.definitions.system_storage import SystemStorageDefinition
        from .composite_descent import composite_descent

        check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition)
        run_config = check.opt_dict_param(run_config, "run_config")
        check.opt_str_param(mode, "mode")

        mode = mode or pipeline_def.get_default_mode_name()
        environment_type = create_environment_type(pipeline_def, mode)

        config_evr = process_config(environment_type, run_config)
        if not config_evr.success:
            raise DagsterInvalidConfigError(
                "Error in config for pipeline {}".format(pipeline_def.name),
                config_evr.errors,
>               run_config,
            )
E           dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeline ephemeral_get_myfunc_df_solid_pipeline
E               Error 1: Undefined field "inputs" at path root:solids:get_myfunc_df. Expected: "{ outputs?: [{ result?: { csv: { path: (String | { env: String }) sep?: (String | { env: String }) } parquet: { path: (String | { env: String }) } pickle: { path: (String | { env: String }) } table: { path: (String | { env: String }) } } }] }".

.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/system_config/objects.py:101: DagsterInvalidConfigError
_______________________________________________________________________ Test_myfunc.test_write_myfunc_to_redis ________________________________________________________________________

self = <repos.myfunc.myfunc.dagster.tests.test_main.Test_myfunc testMethod=test_write_myfunc_to_redis>

    @pytest.mark.myfunc
    def test_write_myfunc_to_redis(self):
        """Test redis write"""
        records = [
            ("k", "v"),
            ("k2", "v2"),
        ]
        result = execute_solid(
            write_myfunc_to_redis,
            mode_def=test_mode,
            input_values={"myfunc_list": records},
>           run_config=run_config,
        )

repos/myfunc/myfunc/dagster/tests/test_main.py:56:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/utils/test/__init__.py:324: in execute_solid
    raise_on_error=raise_on_error,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:335: in execute_pipeline
    raise_on_error=raise_on_error,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/telemetry.py:90: in wrap
    result = f(*args, **kwargs)
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:375: in _logged_execute_pipeline
    tags=tags,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/instance/__init__.py:586: in create_run_for_pipeline
    pipeline_def, run_config=run_config, mode=mode,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:644: in create_execution_plan
    environment_config = EnvironmentConfig.build(pipeline_def, run_config, mode=mode)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

pipeline_def = <dagster.core.definitions.pipeline.PipelineDefinition object at 0x135d39490>
run_config = {'resources': {'ge_data_context': {'config': {'ge_root_dir': '/Users/this_user/Workspace/drizly-dagster/repos/datas...cause_you_bought/dagster/tests/test.csv'}}}, 'solids': {'get_myfunc_df': {'inputs': {'query': {'value': 'select 1'}}}}}
mode = 'test'

    @staticmethod
    def build(pipeline_def, run_config=None, mode=None):
        """This method validates a given run config against the pipeline config schema. If
        successful, we instantiate an EnvironmentConfig object.

        In case the run_config is invalid, this method raises a DagsterInvalidConfigError
        """
        from dagster.config.validate import process_config
        from dagster.core.definitions.executor import ExecutorDefinition
        from dagster.core.definitions.intermediate_storage import IntermediateStorageDefinition
        from dagster.core.definitions.system_storage import SystemStorageDefinition
        from .composite_descent import composite_descent

        check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition)
        run_config = check.opt_dict_param(run_config, "run_config")
        check.opt_str_param(mode, "mode")

        mode = mode or pipeline_def.get_default_mode_name()
        environment_type = create_environment_type(pipeline_def, mode)

        config_evr = process_config(environment_type, run_config)
        if not config_evr.success:
            raise DagsterInvalidConfigError(
                "Error in config for pipeline {}".format(pipeline_def.name),
                config_evr.errors,
>               run_config,
            )
E           dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeline ephemeral_write_myfunc_to_redis_solid_pipeline
E               Error 1: Undefined field "get_myfunc_df" at path root:solids. Expected: "{ myfunc_list?: { outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] } write_myfunc_to_redis?: { outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] } }".

.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/system_config/objects.py:101: DagsterInvalidConfigError
=============================================================================== short test summary info ===============================================================================
FAILED repos/myfunc/myfunc/dagster/tests/test_main.py::Test_myfunc::test_myfunc_df - dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeli...
FAILED repos/myfunc/myfunc/dagster/tests/test_main.py::Test_myfunc::test_write_myfunc_to_redis - dagster.core.errors.DagsterInvalidConfigError: Error in conf

下面 的解决方案有效 关键问题是管道需要在配置中定义实体,并且实体在其测试函数中通过相同的配置和 input_values。我的更改是删除“input_values”作为参数并通过运行配置传递它们。由于我的间隙实体需要更复杂的对象并且我的配置文件是 yaml,因此我在所有实体测试中添加了以下内容:

        this_solid_run_config = copy.deepcopy(run_config)
        input_dict = {"df": pd.DataFrame(['1', '2'], columns = ['key', 'value'])}
        this_solid_run_config.update({"solids":
                                  {"myfunc_df_to_list":
                                       {"inputs":input_dict
                                                  }
                                   }
                              }
                             )
4

1 回答 1

1

根据堆栈跟踪,故障来自:

result = execute_solid(
            get_myfunc_df,
            mode_def=test_mode,
            run_config=run_config,
            input_values={"query": "SELECT 1"},
        )

实心输入“查询”应该从“input_values”参数或“run_config”参数传递,但不能同时传递。如果这不能解决您的问题,很高兴继续挖掘。

于 2020-11-19T16:35:46.733 回答