4

我已经建立了一个流程,如果 kwarg 为空,它会隐式跳过运行给定任务。

我在任务函数中使用这样的东西来跳过逻辑:

if kwargs.get('processors', Hierarchy()).__len__() == 0:
                    raise signals.SKIP('skipping task',
                                       result=Prediction())    

我想构建一些单元测试以确保跳过所述任务的最终状态。在任务级别获取状态的最简单方法是什么?

我可以从文档中看到如何获得流程而不是任务。

更新

为了补充克里斯的回应,我使用了他提出的第一个选项。由于我的流程是在测试之外定义的,因此我创建了一个简单的函数来获取一组已跳过的任务。在测试中,这与应该跳过的任务列表进行了比较:

def get_skipped_tasks(flow_state):
        return set(key.name for key, value in flow_state.result.items() if value.is_skipped())
4

1 回答 1

2

为了完整起见,我将在此处包含几种方法;对于我的示例,我将使用这个基本流程:

from prefect import task, Flow
from prefect.engine.signals import SKIP
import random


@task
def random_number():
    return random.randint(0, 100)

@task
def is_even(num):
    if num % 2:
        raise SKIP("odd number")
    return True

with Flow("dummy") as flow:
    even_task = is_even(random_number)

运行整个流程

交互式运行时,您始终可以运行整个流程并从父流程运行状态访问各个任务状态;请注意,当您“调用”任务(例如is_even(random_number))时,会创建一个副本,因此您需要正确跟踪这些副本。

flow_state = flow.run()

assert flow_state.result[even_task].is_skipped() # for example

使用模拟数据运行一段流程

交互式运行时,您还可以传递任务字典 -> 运行者将尊重的状态;可以选择为这些状态提供数据:

from prefect.engine.state import Success

mocked_state = Success(result=2)


flow_state = flow.run(task_states={random_number: mocked_state})
assert not flow_state.result[even_task].is_skipped()

使用 TaskRunner

最后,如果你想单独在这个任务上运行基于状态的测试,你可以使用TaskRunner. 这变得有点复杂,因为您必须使用Edges 重新创建上游依赖项。

from prefect.engine.task_runner import TaskRunner
from prefect.edge import Edge

runner = TaskRunner(task=even_task)
edge = Edge(key="num", upstream_task=random_number, downstream_task=even_task)


task_state = runner.run(upstream_states={edge: mocked_state})
assert not task_state.is_skipped()
于 2020-12-10T02:14:19.733 回答