3

假设我在管道上连接了 Dagster 中的两个固体。第一个实体可能会执行一些处理并生成有效输入,以便管道的其余部分执行,或者生成不应进一步处理的无效输入。为了达到这个结果,我在数据满足无效条件时引发错误,因此管道停止并跳过其余的实体。

引发错误以解决我的用例似乎很棘手,有没有一种方法可以跳过管道其余部分的执行而不诉诸异常?

from dagster import solid, pipeline

@solid
def solid_1(context, x: int):
    y = x + 1

    if y%2 == 0:
        raise "No even number is further processed"

    return y

@solid
def solid_2(context, y:int):
    return y**2

@pipeline
def toy_pipeline():
    solid_2(solid_1())

在这个非常人为的示例中,实体 2 仅应在第一个实体的输出为奇数时执行。

在我的实际用例中,第一个实体轮询数据库,有时找不到要处理的数据。在这种情况下,不要将执行标记为失败,而是标记为成功。可以在每个下游实体中检查数据是否满足条件,但这很快就会增加样板。当接收数据的实体找不到要处理的数据时,最好有一种方法跳过所有下游实体的执行。

4

1 回答 1

4

为了实现您想要的行为,可以使用is_required=False相应的参数将输出标记为可选OutputDefinition。这意味着输出不一定必须由实体产生。

如果未产生可选输出,则依赖于输出的所有下游固体将简单地跳过。这对于短路管道(这是您的用例)或更复杂的分支逻辑都很有用。跳过实体时,管道运行不会标记为失败。

您正在使用类型提示来定义输入和输出类型,但由于您需要指定is_required参数,因此您需要使用显式的OuputDefinition.

from dagster import pipeline, solid, RepositoryDefinition, InputDefinition, OutputDefinition, Output
from typing import List

def query_db():
    return []

@solid(output_defs=[OutputDefinition(List[int], 'data', is_required=False)])
def solid_1(context):
    rows = query_db()

    if len(rows) > 0:
        yield Output(rows, output_name="data")


@solid
def solid_2(context, data: List[int]):
    context.log.info(str(data))
    pass


@pipeline
def my_pipeline():
    solid_2(solid_1())

solid_2也可以使用InputDefinition代替类型提示来定义实体。类型提示是语法糖InputDefinitions

@solid(input_defs=[InputDefinition('data', List[int])])
def solid_2(context, data):
    context.log.info(str(data))
    # Process data
    pass

附带说明:通常,异常是将实体标记为失败的正确方法,并且在 Dagster 代码中不被视为 hacky。

于 2020-05-26T15:33:08.743 回答