我创建了一个测试管道,但它中途失败了。我想以编程方式重新执行它,但从管道的失败步骤开始并继续前进。我不想重复执行之前的成功步骤。
from dagster import DagsterInstance, execute_pipeline, pipeline, solid, reexecute_pipeline
from random import random
instance = DagsterInstance.ephemeral()
@solid
def step1(context, data):
return range(10), ('a' + i for i in range(10))
@solid
def step2(context, step1op):
x,y = step1op
# simulation of noise
xx = [el * (1 + 0.1 * random()) for el in x]
xx2 = [(el - 1)/el for el in xx]
return zip(xx, xx2), y
@solid
def step3(context, step2op):
x, y = step2op
...
return x, y
run_config = {...}
@pipeline
def inputs_pipeline():
step3(step2(step1()))