假设我在管道上连接了 Dagster 中的两个固体。第一个实体可能会执行一些处理并生成有效输入,以便管道的其余部分执行,或者生成不应进一步处理的无效输入。为了达到这个结果,我在数据满足无效条件时引发错误,因此管道停止并跳过其余的实体。
引发错误以解决我的用例似乎很棘手,有没有一种方法可以跳过管道其余部分的执行而不诉诸异常?
from dagster import solid, pipeline
@solid
def solid_1(context, x: int):
y = x + 1
if y%2 == 0:
raise "No even number is further processed"
return y
@solid
def solid_2(context, y:int):
return y**2
@pipeline
def toy_pipeline():
solid_2(solid_1())
在这个非常人为的示例中,实体 2 仅应在第一个实体的输出为奇数时执行。
在我的实际用例中,第一个实体轮询数据库,有时找不到要处理的数据。在这种情况下,不要将执行标记为失败,而是标记为成功。可以在每个下游实体中检查数据是否满足条件,但这很快就会增加样板。当接收数据的实体找不到要处理的数据时,最好有一种方法跳过所有下游实体的执行。