1

我有三个函数使用 papermill 执行 3 个不同的 jupyter 笔记本,我希望第一个 (job1) 和第二个 (job2) 函数同时运行,最后一个函数 (job3) 仅在第一个函数 (job1) 完成运行后运行任何错误。我不确定为第二个函数创建一个新线程或如何正确使用 join() 方法是否有意义。我在 Windows 上运行,由于某种原因 concurrent.futures 和多处理不起作用,这就是我使用线程模块的原因。

def job1():

    return pm.execute_notebook('notebook1.ipynb',log_output=False)

def job2():

     return pm.execute_notebook('notebook2.ipynb',log_output=False)

def job3():

     return pm.execute_notebook('notebook3.ipynb',log_output=False)


t1 = threading.Thread(target = job1)
t2 = threading.Thread(target = job2)
t3 = threading.Thread(target = job3)


try:
   t1.start()
   t1.join()
   t2.start()

except:
   pass

finally:

   t3.start()
4

2 回答 2

0

我喜欢从可视化所需的流程开始,我理解它看起来像:

在此处输入图像描述

这意味着 t1 和 t2 需要同时启动,然后您需要同时加入:

   t1.start() # <- Started 
   t2.start() # <- Started
   # t1 and t2 executing concurrently

   t1.join()
   t2.join()
   # wait for both to finish

   t3.start()
   t3.join()

t1, t2 连接顺序并不重要,因为您的程序无论如何都必须等待运行时间最长的线程。如果 t1 先完成,它将在 t2 上阻塞,如果 t2 先完成,它仍然需要等待 t1,然后将在 t2.join() 上“无操作”。

于 2020-04-23T15:43:50.963 回答
0

我编写了一个程序来并行运行 DAG,这正是您所需要的。这是作为开源项目的一部分完成的。该解决方案使用进程而不是线程,但您可以对其进行调整。基本思想是拥有一个进程池并跟踪每个任务的状态,每当任何任务完成时,都会发送下一个下一个任务。顺序是通过按拓扑顺序迭代 DAG 来确定的。

如果任何任务失败,它们的所有下游依赖项都将中止。程序继续运行,直到没有更多任务要运行,代码在这里:https ://github.com/ploomber/ploomber/blob/0.4.1/src/ploomber/executors/Parallel.py

我看到您正在尝试在 papermill 中运行笔记本,我开发的工具完全支持这一点:它可以运行任意复杂的管道(其中任务可以参数化 Jupyter 笔记本)。看起来适合您的用例:https ://github.com/ploomber/ploomber

于 2020-05-21T01:06:58.403 回答