0

我当前的管道在 ParallelFor 操作中跨多个用户运行训练过程,例如:

def pipeline(run_id):
    setup_step = create_setup_step(run_id)
    
    with dsl.ParallelFor(setup_step.outputs['users']) as user:
        preprocess = create_preprocess_step(run_id, user.user_id)
        train = create_training_step(run_id, 
                                     user.user_id, 
                                     preprocess.outputs['user_data'])

    summary = create_summary_step(run_id)  # this is the component that needs to execute last

我的目标是添加在上述所有组件完成运行后执行的“总结”步骤。该组件将编译所有用户的报告,因此它不应存在于ParallelFor

每个组件的结果都被记录到数据库中,因此摘要组件通过查询数据库而不是尝试“扇入”ParallelFor 运算符来获取其数据。

我已经尝试指定在 train 步骤之后运行,create_summary_step(run_id).after(train)但是这样会在ParallelFor.

通过在运行完成后手动运行摘要组件,我取得了一些成功,例如client.wait_for_run_completion(...),但这限制了我将管道编译和上传到 EKS,这是最终目标。

4

0 回答 0