1

作为输入,我想根据用户输入检索数据,或者如果没有给出用户输入,则从数据库中“随机”检索数据。管道的所有其他下游任务都是相同的。

因此,我想创建一个从实体 A 和 B 开始的管道,以及基于来自实体 A实体 B的输入执行的下游实体 C。
但是,当在实体 A 和 B 上使用条件输出时,不执行实体 C,因为上游固体不产生一种输入。

有没有一种我错过的简单方法?

谢谢你的帮助。

4

2 回答 2

1

除非跳过所有扇入输出,否则不会跳过“扇入”依赖项,因此这是实现此目的的一种方法。

@pipeline
def example():
  maybe_a = A()
  maybe_b = B()
  C(items=[maybe_a, maybe_b])

https://docs.dagster.io/examples/fan_in_pipeline#main

于 2020-12-11T16:37:13.603 回答
0

根据@Alex 的建议,这是一个可执行的管道:

from dagster import pipeline, solid


@solid()
def A(_):
    return


@solid()
def B(_):
    return

@solid()
def C(context, results):
    context.log.info("a returned {}".format(results[0]))
    context.log.info("b returned {}".format(results[1]))
    context.log.info("compute c")


@pipeline
def my_pipeline():
    a = A()
    b = B()
    C([a, b])
于 2020-12-11T19:36:19.633 回答