2

我在 Kedro 有一个管道,如下所示:

from kedro.pipeline import Pipeline, node
from .nodes import *

def foo():
    return Pipeline([
        node(a, inputs=["train_x", "test_x"], outputs=dict(bar_a="bar_a"), name="A"),
        node(b, inputs=["train_x", "test_x"], outputs=dict(bar_b="bar_b"), name="B"),
        node(c, inputs=["train_x", "test_x"], outputs=dict(bar_c="bar_c"), name="C"),
        node(d, inputs=["train_x", "test_x"], outputs=dict(bar_d="bar_d"), name="D"),
        
    ])

节点 A、B 和 C 不是很占用资源,但它们需要一段时间,所以我想并行运行它们,另一方面,节点 D 几乎使用了我所有的内存,如果它会失败与其他节点一起执行。有没有办法告诉 Kedro 在执行节点 D 之前等待 A、B 和 C 完成并保持代码有条理?

4

1 回答 1

3

Kedro 根据不同节点的输入/输出之间的相互依赖关系确定执行顺序。在您的情况下,节点 D 不依赖于任何其他节点,因此无法保证执行顺序。同样,如果使用并行运行器,也不能保证节点 D不会与 A、B 和 C 并行运行。

也就是说,有几种解决方法可以用来实现特定的执行顺序。

1 [Preferred] 单独运行节点

而不是做kedro run --parallel,你可以做:

kedro run --pipeline foo --node A --node B --node C --parallel; kedro run --pipeline foo --node D

这可以说是首选的解决方案,因为它不需要更改代码(如果您曾经在不同的机器上运行相同的管道,这很好)。如果您希望节点 D 仅在 A、B 和 C 成功时运行,&&您可以这样做。;如果运行逻辑变得更复杂,您可以将其存储在 Makefile/bash 脚本中。

2 使用虚拟输入/输出

您还可以通过引入虚拟数据集来强制执行顺序。就像是:

def foo():
    return Pipeline([
        node(a, inputs=["train_x", "test_x"], outputs=[dict(bar_a="bar_a"), "a_done"], name="A"),
        node(b, inputs=["train_x", "test_x"], outputs=[dict(bar_b="bar_b"), "b_done"], name="B"),
        node(c, inputs=["train_x", "test_x"], outputs=[dict(bar_c="bar_c"), "c_done"], name="C"),
        node(d, inputs=["train_x", "test_x", "a_done", "b_done", "c_done"], outputs=dict(bar_d="bar_d"), name="D"),     
    ])

空列表可以用于虚拟数据集。底层函数还必须返回/获取附加参数。

这种方法的优点是kedro run --parallel会立即产生所需的执行逻辑。缺点是污染了节点和底层函数的定义。

如果你走这条路,你还必须决定是否要将虚拟数据集存储在数据目录中(污染更多,但允许节点 D 自己运行)或不(节点 D 不能在其上运行)自己的)。


相关讨论 [ 1 , 2 ]

于 2021-07-19T16:49:31.123 回答