1

我创建了一个测试管道,但它中途失败了。我想以编程方式重新执行它,但从管道的失败步骤开始并继续前进。我不想重复执行之前的成功步骤。

from dagster import DagsterInstance, execute_pipeline, pipeline, solid, reexecute_pipeline
from random import random

instance = DagsterInstance.ephemeral()

@solid
def step1(context, data):
   return range(10), ('a' + i for i in range(10))

@solid
def step2(context, step1op):
  x,y = step1op

  # simulation of noise
  xx = [el * (1 + 0.1 * random()) for el in x]
  xx2 = [(el - 1)/el for el in xx]
  return zip(xx, xx2), y

@solid
def step3(context, step2op):
   x, y = step2op

   ...
   return x, y


run_config = {...}


@pipeline
def inputs_pipeline():
     step3(step2(step1()))
4

1 回答 1

0

部分管道的编程重新执行需要识别可用的父实体的 ID:

parent_run_id = instance.get_runs()[0].run_id

然后重新启动管道:

result = reexecute_pipeline(inputs_pipeline, parent_run_id=parent_run_id,
                            step_keys_to_execute=['step2.compute', 'step3.compute'],
                            run_config=run_config, instance=instance)
于 2020-11-23T14:21:16.537 回答