在查看了您的错误的 dagster 代码库之后,我在这里找到了。它证实了我在教程中读到的“输出名称必须是唯一的”。
鉴于您在 for 循环中声明了 Output 并且收到了错误,您的 Output 对象名称很可能不是唯一的。
更新:从您通过打开问题对 dagster 进行的外展活动中,我测试了在运行时动态创建输出的想法,如果您在@solid
. 我确实发现,当尝试在 a 中构建我的动态数据@solid
以将其输出用作后继者的可靠配置输入时,后继@solid
者@solid
没有选择更新的结构。结果是我收到了dagster.core.errors.DagsterInvariantViolationError
下面是我的代码,用于验证在实体之外执行动态数据生成时在运行时产生的动态输出。我猜这可能有点反模式,但如果 Dagster 还没有达到成熟水平来处理你提出的场景,可能还没有。另请注意,我没有处理的事情是对所有产生的输出对象做一些事情。
"""dagit -f dynamic_output_at_runtime.py -n dynamic_output_at_runtime"""
import random
from dagster import (
Output,
OutputDefinition,
execute_pipeline,
pipeline,
solid,
SystemComputeExecutionContext
)
# Create some dynamic OutputDefinition list for each execution
start = 1
stop = 100
limit = random.randint(1, 10)
random_set_of_ints = {random.randint(start, stop) for iter in range(limit)}
output_defs_runtime = [OutputDefinition(
name=f'output_{num}') for num in random_set_of_ints]
@solid(output_defs=output_defs_runtime)
def ints_for_all(context: SystemComputeExecutionContext):
for num in random_set_of_ints:
out_name = f"output_{num}"
context.log.info(f"output object name: {out_name}")
yield Output(num, out_name)
@pipeline
def dynamic_output_at_runtime():
x = ints_for_all()
print(x)
if __name__ == '__main__':
result = execute_pipeline(dynamic_output_at_runtime)
assert result.success
我重新运行此管道的结果是每次不同的输出产量:
python dynamic_output_at_runtime.py
_ints_for_all_outputs(output_56=<dagster.core.definitions.composition.InvokedSolidOutputHandle object at 0x7fb899cea160>, output_8=<dagster.core.definitions.composition.InvokedSolidOutputHandle object at 0x7fb899cea198>, output_58=<dagster.core.definitions.composition.InvokedSolidOutputHandle object at 0x7fb899cea1d0>, output_35=<dagster.core.definitions.composition.InvokedSolidOutputHandle object at 0x7fb899cea208>)
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - PIPELINE_START - Started execution of pipeline "dynamic_output_at_runtime".
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - ENGINE_EVENT - Executing steps in process (pid: 9456)
event_specific_data = {"metadata_entries": [["pid", null, ["9456"]], ["step_keys", null, ["{'ints_for_all.compute'}"]]]}
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - STEP_START - Started execution of step "ints_for_all.compute".
solid = "ints_for_all"
solid_definition = "ints_for_all"
step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - INFO - system - a1273816-16b0-439b-ae32-dbd819f65b9a - output object name: output_56
solid = "ints_for_all"
solid_definition = "ints_for_all"
step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - STEP_OUTPUT - Yielded output "output_56" of type "Any". (Type check passed).
event_specific_data = {"intermediate_materialization": null, "step_output_handle": ["ints_for_all.compute", "output_56"], "type_check_data": [true, "output_56", null, []]}
solid = "ints_for_all"
solid_definition = "ints_for_all"
step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - INFO - system - a1273816-16b0-439b-ae32-dbd819f65b9a - output object name: output_8
solid = "ints_for_all"
solid_definition = "ints_for_all"
step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - STEP_OUTPUT - Yielded output "output_8" of type "Any". (Type check passed).
event_specific_data = {"intermediate_materialization": null, "step_output_handle": ["ints_for_all.compute", "output_8"], "type_check_data": [true, "output_8", null, []]}
solid = "ints_for_all"
solid_definition = "ints_for_all"
step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - INFO - system - a1273816-16b0-439b-ae32-dbd819f65b9a - output object name: output_58
solid = "ints_for_all"
solid_definition = "ints_for_all"
step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - STEP_OUTPUT - Yielded output "output_58" of type "Any". (Type check passed).
event_specific_data = {"intermediate_materialization": null, "step_output_handle": ["ints_for_all.compute", "output_58"], "type_check_data": [true, "output_58", null, []]}
solid = "ints_for_all"
solid_definition = "ints_for_all"
step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - INFO - system - a1273816-16b0-439b-ae32-dbd819f65b9a - output object name: output_35
solid = "ints_for_all"
solid_definition = "ints_for_all"
step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - STEP_OUTPUT - Yielded output "output_35" of type "Any". (Type check passed).
event_specific_data = {"intermediate_materialization": null, "step_output_handle": ["ints_for_all.compute", "output_35"], "type_check_data": [true, "output_35", null, []]}
solid = "ints_for_all"
solid_definition = "ints_for_all"
step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - STEP_SUCCESS - Finished execution of step "ints_for_all.compute" in 2.17ms.
event_specific_data = {"duration_ms": 2.166192003642209}
solid = "ints_for_all"
solid_definition = "ints_for_all"
step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - ENGINE_EVENT - Finished steps in process (pid: 9456) in 3.11ms
event_specific_data = {"metadata_entries": [["pid", null, ["9456"]], ["step_keys", null, ["{'ints_for_all.compute'}"]]]}
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - PIPELINE_SUCCESS - Finished execution of pipeline "dynamic_output_at_runtime".
我希望这有帮助!