2

我有一个固体需要在 2 个固体之后运行。一个将返回一个值,另一个不返回任何内容,但具有依赖实体并且需要时间来运行。

我在multiprocessingmode中执行管道,如果实体没有定义依赖项,它们会同时运行。

以下是我正在寻找的示例情况。假设我有以下固体。

@solid(input_defs=[InputDefinition("start", Nothing)])
def solid_a(context):
    import time
    time.sleep(2)
    context.log.info('yey')

@solid
def solid_b(context):
    return 1

@composite_solid
def my_composite_solid(wait_solid_a: Nothing, solid_b_output: int):
    some_other_solid(solid_b_output)

执行时,这些实体将在下面的时间线中运行。

时间飞逝 坚硬的
0 管道开始...
1秒 solid_b开始
3 秒 solid_a依赖实体正在运行。solid_a还没有开始。
5 秒 solid_b完成的
10 秒 solid_a现在开始
15 秒 solid_a完成的
20 秒 my_composite_solid 应该现在开始

所以,根据这个时间表,为了my_composite_solid开始,我需要两者solid_asolid_b完成执行。但是,当我这样做时,dagster 会抛出一个错误:

dagster.core.errors.DagsterInvalidDefinitionError: @composite_solid 'my_composite_solid' has unmapped input 'wait_solid_a'. Remove it or pass it to the appropriate solid invocation.

如果我不将solid_a输出作为依赖my_composite_solid项,它将在solid_b. 我应该怎么办?

4

0 回答 0