事实证明,有一个实验性功能(希望将成为官方)允许基于可迭代输出的元素创建任务。下面的工作代码:
from dagster import execute_pipeline, pipeline, solid, Output, OutputDefinition
from dagster.experimental import DynamicOutput, DynamicOutputDefinition
from typing import List
@solid
def return_some_list(context):
return [1, 2, 3, 4, 5]
@solid(output_defs=[DynamicOutputDefinition(int)])
def generate_subtasks(context, nums: List[int]):
context.log.info(str(nums))
for num in nums:
yield DynamicOutput(num, mapping_key=f'subtask_{num}')
@solid
def print_num(context, some_num: int):
context.log.info(str(some_num))
return some_num
@pipeline
def some_pipeline():
output_list = return_some_list()
generate_subtasks(output_list).map(print_num)
if __name__ == "__main__":
result = execute_pipeline(some_pipeline)
在这里,return_some_list
返回一个可迭代的。我们想为这个可迭代的每个元素运行一个实体。我们在 solid 中执行此操作generate_subtasks
,这会产生一个DynamicOutput
带有元素的 a 和一个将为它生成的子任务的名称。的DynamicOutput
类型信息DynamicOutputDefinition
在solid
规范中给出。
为了连接这些实体,我们首先通过 获取列表return_some_list
。然后调用generate_subtasks
作为生成器的 ,并将函数map
输出到它的每个输出print_num
。
运行整个管道应该为 生成的每个子任务打印大量信息,generate_subtasks
如下所示(仅显示部分输出):
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_4] - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_4] - HANDLED_OUTPUT - Handled output "result" using output manager "io_manager"
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_4] - STEP_SUCCESS - Finished execution of step "print_num[subtask_4]" in 2.1ms.
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_START - Started execution of step "print_num[subtask_5]".
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - LOADED_INPUT - Loaded input "some_num" using input manager "io_manager", from output "result" of step "test"
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_INPUT - Got input "some_num" of type "Int". (Type check passed).
2021-03-13 21:27:53 - dagster - INFO - system - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - print_num[subtask_5] - 5
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - HANDLED_OUTPUT - Handled output "result" using output manager "io_manager"
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_SUCCESS - Finished execution of step "print_num[subtask_5]" in 1.98ms.
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - ENGINE_EVENT - Finished steps in process (pid: 33738) in 44ms
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - PIPELINE_SUCCESS - Finished execution of pipeline "some_pipeline".
哦,还有一件很酷的事情:Dagster 执行类型检查,如果你给它一个错误类型的参数,它会很快失败。因此,如果我们要向函数提供print_str
,map
它甚至会拒绝运行。