2

我一直在尝试运行参数化和连接的流

基本上,有一个任务返回一个参数字典 ( make_poem),以及一个基于参数 ( declaim_poem) 打印某些内容的流程。还有以下父流程:

flow_run = StartFlowRun(flow_name="declaim_flow", project_name=project_name)
with Flow("param_flow") as flow:
    flow_run.set_upstream(task(make_poem), key="parameters")

这比跑了,结果如下:

[2021-08-01 17:04:05+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'param_flow'
[2021-08-01 17:04:05+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'param_flow'
[2021-08-01 17:04:05+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'param_flow'
[2021-08-01 17:04:05+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'param_flow'
[2021-08-01 17:04:05+0200] INFO - prefect.TaskRunner | Task 'make_poem': Starting task run...
[2021-08-01 17:04:05+0200] INFO - prefect.TaskRunner | Task 'make_poem': Finished task run for task with final state: 'Success'
[2021-08-01 17:04:05+0200] INFO - prefect.TaskRunner | Task 'Flow declaim_flow': Starting task run...
[2021-08-01 17:04:06+0200] INFO - prefect.Flow declaim_flow | Flow Run: < cloud link >
[2021-08-01 17:04:06+0200] INFO - prefect.TaskRunner | Task 'Flow declaim_flow': Finished task run for task with final state: 'Success'
[2021-08-01 17:04:06+0200] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded

即使这是一个成功的结果,我也无法从declaim_poem(如暗示的那样,一首诗)中得到预期的输出以在屏幕上打印出来。根据此页面,我正在使用在declaim_poem任务内部实例化的记录器,而不是流。

在提供的 Prefect Cloud 链接中,我也找不到这首诗,只有来自所有流程的所有任务的成功结果。

关于我做错了什么的任何想法?

4

1 回答 1

0

试试这个,不同的 API,但做StartRunFlow与输出 sub_flow 日志相同的事情。

from prefect import Flow

from prefect.tasks.prefect.flow_run import create_flow_run, wait_for_flow_run

with Flow("parent-flow") as flow:

    flow_1_run_id = create_flow_run(flow_name="sub_flow_1")
    wait_for_flow_run(flow_1_run_id, stream_logs=True)

    flow_2_run_id = create_flow_run(flow_name="sub_flow_2")
    wait_for_flow_run(flow_2_run_id, stream_logs=True)

参考资料:https ://www.prefect.io/blog/prefect-0-15-0-a-new-flow-run-experience/

于 2021-11-29T04:02:58.793 回答