我有一个固体需要在 2 个固体之后运行。一个将返回一个值,另一个不返回任何内容,但具有依赖实体并且需要时间来运行。
我在multiprocessing
mode中执行管道,如果实体没有定义依赖项,它们会同时运行。
以下是我正在寻找的示例情况。假设我有以下固体。
@solid(input_defs=[InputDefinition("start", Nothing)])
def solid_a(context):
import time
time.sleep(2)
context.log.info('yey')
@solid
def solid_b(context):
return 1
@composite_solid
def my_composite_solid(wait_solid_a: Nothing, solid_b_output: int):
some_other_solid(solid_b_output)
执行时,这些实体将在下面的时间线中运行。
时间飞逝 | 坚硬的 |
---|---|
0 | 管道开始... |
1秒 | solid_b 开始 |
3 秒 | solid_a 依赖实体正在运行。solid_a 还没有开始。 |
5 秒 | solid_b 完成的 |
10 秒 | solid_a 现在开始 |
15 秒 | solid_a 完成的 |
20 秒 | my_composite_solid 应该现在开始。 |
所以,根据这个时间表,为了my_composite_solid
开始,我需要两者solid_a
并solid_b
完成执行。但是,当我这样做时,dagster 会抛出一个错误:
dagster.core.errors.DagsterInvalidDefinitionError: @composite_solid 'my_composite_solid' has unmapped input 'wait_solid_a'. Remove it or pass it to the appropriate solid invocation.
如果我不将solid_a
输出作为依赖my_composite_solid
项,它将在solid_b
. 我应该怎么办?