为了完整起见,我将在此处包含几种方法;对于我的示例,我将使用这个基本流程:
from prefect import task, Flow
from prefect.engine.signals import SKIP
import random
@task
def random_number():
return random.randint(0, 100)
@task
def is_even(num):
if num % 2:
raise SKIP("odd number")
return True
with Flow("dummy") as flow:
even_task = is_even(random_number)
运行整个流程
交互式运行时,您始终可以运行整个流程并从父流程运行状态访问各个任务状态;请注意,当您“调用”任务(例如is_even(random_number)
)时,会创建一个副本,因此您需要正确跟踪这些副本。
flow_state = flow.run()
assert flow_state.result[even_task].is_skipped() # for example
使用模拟数据运行一段流程
交互式运行时,您还可以传递任务字典 -> 运行者将尊重的状态;可以选择为这些状态提供数据:
from prefect.engine.state import Success
mocked_state = Success(result=2)
flow_state = flow.run(task_states={random_number: mocked_state})
assert not flow_state.result[even_task].is_skipped()
使用 TaskRunner
最后,如果你想单独在这个任务上运行基于状态的测试,你可以使用TaskRunner
. 这变得有点复杂,因为您必须使用Edge
s 重新创建上游依赖项。
from prefect.engine.task_runner import TaskRunner
from prefect.edge import Edge
runner = TaskRunner(task=even_task)
edge = Edge(key="num", upstream_task=random_number, downstream_task=even_task)
task_state = runner.run(upstream_states={edge: mocked_state})
assert not task_state.is_skipped()