摘要:Dagit 与 PyTest 的 Dagster 运行配置似乎与我的项目不兼容
我一直在尝试在管道上运行 pytest 时遇到错误,我非常感谢任何指针。我一直收到以下形式的错误:
dagster.core.errors.DagsterInvalidConfigError:
Error in config for pipeline ephemeral_write_myfunc_to_redis_solid_pipeline
Error 1: Undefined field "myfunc_df_to_list" at path root:solids.
Expected: "{ myfunc_list?: { outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] }
write_myfunc_to_redis?:..."
关于项目的几点说明:
- 匕首,版本 0.9.15
- 我的管道在 Dagit 中运行,对于相同的配置没有错误
- 单元测试针对构成管道的各个实体运行
失败的解决方案:我尝试使用实体填充配置文件,这些实体定义了每个 pytest 错误所建议的输出,但它们都导致错误比之前的错误更不透明。
我的固体是:
@solid(required_resource_keys={"db"})
def get_myfunc_df(context, query: String) -> myfuncDF:
do something
return myfuncDF
@solid
def myfunc_df_to_list(context, df: myfuncDF) -> List:
do something
return List
@solid(required_resource_keys={"redis"})
def write_myfunc_to_redis(context, myfunc_list:List) -> None:
write to redis return None
我的管道是这些固体的链
@pipeline(
mode_defs=filter_modes(MODES),
preset_defs=filter_presets(PRESETS),
tags={"type": "myproject"},
)
def myfunc_to_redis_pipeline():
df = get_myfunc_df()
myfunc_list = myfunc_df_to_list(df)
write_myfunc_to_redis(myfunc_list)
我在 test_main.py 中的测试代码是
@pytest.mark.myfunc
def test_myfunc_to_redis_pipeline(self):
res = execute_pipeline(myfunc_to_redis_pipeline,
preset="test",)
assert res.success
assert len(res.solid_result_list) == 4
for solid_res in res.solid_result_list:
assert solid_res.success
其中预设的“测试”是使用 yaml 文件中的运行配置定义的:
resources:
db:
config:
file_path: test.csv
^ 这是它抛出最多错误的地方,我一直在迭代不同的固体排列以添加 ala:
solids:
get_myfunc_df:
inputs:
query:
value: select 1
但它还没有解决问题。尽管在 Dagit 中运行时,只有输入实体需要定义,但是否有任何理由需要定义用于测试的实体的输出?
这个错误是否表明有其他问题?
编辑:这是来自 tox --verbose 的堆栈跟踪
self = <repos.myfunc.myfunc.dagster.tests.test_main.Test_myfunc testMethod=test_myfunc_df>
@pytest.mark.myfunc
def test_myfunc_df(self):
"""myfunc"""
result = execute_solid(
get_myfunc_df,
mode_def=test_mode,
run_config=run_config,
> input_values={"query": "SELECT 1"},
)
repos/myfunc/myfunc/dagster/tests/test_main.py:29:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/utils/test/__init__.py:324: in execute_solid
raise_on_error=raise_on_error,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:335: in execute_pipeline
raise_on_error=raise_on_error,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/telemetry.py:90: in wrap
result = f(*args, **kwargs)
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:375: in _logged_execute_pipeline
tags=tags,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/instance/__init__.py:586: in create_run_for_pipeline
pipeline_def, run_config=run_config, mode=mode,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:644: in create_execution_plan
environment_config = EnvironmentConfig.build(pipeline_def, run_config, mode=mode)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
pipeline_def = <dagster.core.definitions.pipeline.PipelineDefinition object at 0x1359f6210>
run_config = {'resources': {'ge_data_context': {'config': {'ge_root_dir': '/Users/this_user/Workspace/drizly-dagster/repos/datas...cause_you_bought/dagster/tests/test.csv'}}}, 'solids': {'get_myfunc_df': {'inputs': {'query': {'value': 'select 1'}}}}}
mode = 'test'
@staticmethod
def build(pipeline_def, run_config=None, mode=None):
"""This method validates a given run config against the pipeline config schema. If
successful, we instantiate an EnvironmentConfig object.
In case the run_config is invalid, this method raises a DagsterInvalidConfigError
"""
from dagster.config.validate import process_config
from dagster.core.definitions.executor import ExecutorDefinition
from dagster.core.definitions.intermediate_storage import IntermediateStorageDefinition
from dagster.core.definitions.system_storage import SystemStorageDefinition
from .composite_descent import composite_descent
check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition)
run_config = check.opt_dict_param(run_config, "run_config")
check.opt_str_param(mode, "mode")
mode = mode or pipeline_def.get_default_mode_name()
environment_type = create_environment_type(pipeline_def, mode)
config_evr = process_config(environment_type, run_config)
if not config_evr.success:
raise DagsterInvalidConfigError(
"Error in config for pipeline {}".format(pipeline_def.name),
config_evr.errors,
> run_config,
)
E dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeline ephemeral_get_myfunc_df_solid_pipeline
E Error 1: Undefined field "inputs" at path root:solids:get_myfunc_df. Expected: "{ outputs?: [{ result?: { csv: { path: (String | { env: String }) sep?: (String | { env: String }) } parquet: { path: (String | { env: String }) } pickle: { path: (String | { env: String }) } table: { path: (String | { env: String }) } } }] }".
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/system_config/objects.py:101: DagsterInvalidConfigError
_______________________________________________________________________ Test_myfunc.test_write_myfunc_to_redis ________________________________________________________________________
self = <repos.myfunc.myfunc.dagster.tests.test_main.Test_myfunc testMethod=test_write_myfunc_to_redis>
@pytest.mark.myfunc
def test_write_myfunc_to_redis(self):
"""Test redis write"""
records = [
("k", "v"),
("k2", "v2"),
]
result = execute_solid(
write_myfunc_to_redis,
mode_def=test_mode,
input_values={"myfunc_list": records},
> run_config=run_config,
)
repos/myfunc/myfunc/dagster/tests/test_main.py:56:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/utils/test/__init__.py:324: in execute_solid
raise_on_error=raise_on_error,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:335: in execute_pipeline
raise_on_error=raise_on_error,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/telemetry.py:90: in wrap
result = f(*args, **kwargs)
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:375: in _logged_execute_pipeline
tags=tags,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/instance/__init__.py:586: in create_run_for_pipeline
pipeline_def, run_config=run_config, mode=mode,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:644: in create_execution_plan
environment_config = EnvironmentConfig.build(pipeline_def, run_config, mode=mode)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
pipeline_def = <dagster.core.definitions.pipeline.PipelineDefinition object at 0x135d39490>
run_config = {'resources': {'ge_data_context': {'config': {'ge_root_dir': '/Users/this_user/Workspace/drizly-dagster/repos/datas...cause_you_bought/dagster/tests/test.csv'}}}, 'solids': {'get_myfunc_df': {'inputs': {'query': {'value': 'select 1'}}}}}
mode = 'test'
@staticmethod
def build(pipeline_def, run_config=None, mode=None):
"""This method validates a given run config against the pipeline config schema. If
successful, we instantiate an EnvironmentConfig object.
In case the run_config is invalid, this method raises a DagsterInvalidConfigError
"""
from dagster.config.validate import process_config
from dagster.core.definitions.executor import ExecutorDefinition
from dagster.core.definitions.intermediate_storage import IntermediateStorageDefinition
from dagster.core.definitions.system_storage import SystemStorageDefinition
from .composite_descent import composite_descent
check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition)
run_config = check.opt_dict_param(run_config, "run_config")
check.opt_str_param(mode, "mode")
mode = mode or pipeline_def.get_default_mode_name()
environment_type = create_environment_type(pipeline_def, mode)
config_evr = process_config(environment_type, run_config)
if not config_evr.success:
raise DagsterInvalidConfigError(
"Error in config for pipeline {}".format(pipeline_def.name),
config_evr.errors,
> run_config,
)
E dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeline ephemeral_write_myfunc_to_redis_solid_pipeline
E Error 1: Undefined field "get_myfunc_df" at path root:solids. Expected: "{ myfunc_list?: { outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] } write_myfunc_to_redis?: { outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] } }".
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/system_config/objects.py:101: DagsterInvalidConfigError
=============================================================================== short test summary info ===============================================================================
FAILED repos/myfunc/myfunc/dagster/tests/test_main.py::Test_myfunc::test_myfunc_df - dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeli...
FAILED repos/myfunc/myfunc/dagster/tests/test_main.py::Test_myfunc::test_write_myfunc_to_redis - dagster.core.errors.DagsterInvalidConfigError: Error in conf
下面 的解决方案有效 关键问题是管道需要在配置中定义实体,并且实体在其测试函数中通过相同的配置和 input_values。我的更改是删除“input_values”作为参数并通过运行配置传递它们。由于我的间隙实体需要更复杂的对象并且我的配置文件是 yaml,因此我在所有实体测试中添加了以下内容:
this_solid_run_config = copy.deepcopy(run_config)
input_dict = {"df": pd.DataFrame(['1', '2'], columns = ['key', 'value'])}
this_solid_run_config.update({"solids":
{"myfunc_df_to_list":
{"inputs":input_dict
}
}
}
)