50

有没有一种简单的方法来跟踪joblib.Parallel执行的整体进度?

我有一个由数千个作业组成的长时间运行的执行,我想在数据库中进行跟踪和记录。但是,要做到这一点,每当 Parallel 完成任务时,我需要它执行回调,报告剩余的作业数。

我之前使用 Python 的 stdlib multiprocessing.Pool 完成了类似的任务,方法是启动一个线程来记录 Pool 的作业列表中待处理作业的数量。

查看代码,Parallel 继承了 Pool,所以我想我可以使用相同的技巧,但它似乎没有使用这些列表,而且我无法弄清楚如何“读取”它的内部任何其他方式的状态。

4

8 回答 8

57

与 dano 和 Connor 的答案相比,还有一步是将整个事情包装为上下文管理器:

import contextlib
import joblib
from tqdm import tqdm

@contextlib.contextmanager
def tqdm_joblib(tqdm_object):
    """Context manager to patch joblib to report into tqdm progress bar given as argument"""
    class TqdmBatchCompletionCallback(joblib.parallel.BatchCompletionCallBack):
        def __call__(self, *args, **kwargs):
            tqdm_object.update(n=self.batch_size)
            return super().__call__(*args, **kwargs)

    old_batch_callback = joblib.parallel.BatchCompletionCallBack
    joblib.parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback
    try:
        yield tqdm_object
    finally:
        joblib.parallel.BatchCompletionCallBack = old_batch_callback
        tqdm_object.close()

然后你可以像这样使用它,完成后不要留下猴子补丁代码:

from joblib import Parallel, delayed

with tqdm_joblib(tqdm(desc="My calculation", total=10)) as progress_bar:
    Parallel(n_jobs=16)(delayed(sqrt)(i**2) for i in range(10))

我认为这太棒了,它看起来类似于 tqdm pandas 集成。

于 2019-11-19T14:50:30.287 回答
28

为什么不能简单地使用tqdm?以下对我有用

from joblib import Parallel, delayed
from datetime import datetime
from tqdm import tqdm

def myfun(x):
    return x**2

results = Parallel(n_jobs=8)(delayed(myfun)(i) for i in tqdm(range(1000))
100%|██████████| 1000/1000 [00:00<00:00, 10563.37it/s]
于 2018-04-20T22:59:25.860 回答
21

您链接到的文档状态Parallel具有可选的进度表。它是通过使用提供的callback关键字参数实现的multiprocessing.Pool.apply_async

# This is inside a dispatch function
self._lock.acquire()
job = self._pool.apply_async(SafeFunction(func), args,
            kwargs, callback=CallBack(self.n_dispatched, self))
self._jobs.append(job)
self.n_dispatched += 1

...

class CallBack(object):
    """ Callback used by parallel: it is used for progress reporting, and
        to add data to be processed
    """
    def __init__(self, index, parallel):
        self.parallel = parallel
        self.index = index

    def __call__(self, out):
        self.parallel.print_progress(self.index)
        if self.parallel._original_iterable:
            self.parallel.dispatch_next()

这是print_progress

def print_progress(self, index):
    elapsed_time = time.time() - self._start_time

    # This is heuristic code to print only 'verbose' times a messages
    # The challenge is that we may not know the queue length
    if self._original_iterable:
        if _verbosity_filter(index, self.verbose):
            return
        self._print('Done %3i jobs       | elapsed: %s',
                    (index + 1,
                     short_format_time(elapsed_time),
                    ))
    else:
        # We are finished dispatching
        queue_length = self.n_dispatched
        # We always display the first loop
        if not index == 0:
            # Display depending on the number of remaining items
            # A message as soon as we finish dispatching, cursor is 0
            cursor = (queue_length - index + 1
                      - self._pre_dispatch_amount)
            frequency = (queue_length // self.verbose) + 1
            is_last_item = (index + 1 == queue_length)
            if (is_last_item or cursor % frequency):
                return
        remaining_time = (elapsed_time / (index + 1) *
                    (self.n_dispatched - index - 1.))
        self._print('Done %3i out of %3i | elapsed: %s remaining: %s',
                    (index + 1,
                     queue_length,
                     short_format_time(elapsed_time),
                     short_format_time(remaining_time),
                    ))

老实说,他们实现这一点的方式有点奇怪——它似乎假设任务总是按照它们开始的顺序完成。index转到的变量print_progress只是self.n_dispatched作业实际开始时的变量。因此,启动的第一个作业总是以index0 结束,即使说第三个作业首先完成。这也意味着他们实际上并没有跟踪已完成工作的数量。所以没有实例变量供您监控。

我认为最好的方法是制作自己的 CallBack 类和猴子补丁 Parallel:

from math import sqrt
from collections import defaultdict
from joblib import Parallel, delayed

class CallBack(object):
    completed = defaultdict(int)

    def __init__(self, index, parallel):
        self.index = index
        self.parallel = parallel

    def __call__(self, index):
        CallBack.completed[self.parallel] += 1
        print("done with {}".format(CallBack.completed[self.parallel]))
        if self.parallel._original_iterable:
            self.parallel.dispatch_next()

import joblib.parallel
joblib.parallel.CallBack = CallBack

if __name__ == "__main__":
    print(Parallel(n_jobs=2)(delayed(sqrt)(i**2) for i in range(10)))

输出:

done with 1
done with 2
done with 3
done with 4
done with 5
done with 6
done with 7
done with 8
done with 9
done with 10
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

这样,每当作业完成时,您的回调就会被调用,而不是默认的。

于 2014-07-27T18:24:14.060 回答
10

扩展 dano 对最新版本的 joblib 库的回答。内部实现有一些变化。

from joblib import Parallel, delayed
from collections import defaultdict

# patch joblib progress callback
class BatchCompletionCallBack(object):
  completed = defaultdict(int)

  def __init__(self, time, index, parallel):
    self.index = index
    self.parallel = parallel

  def __call__(self, index):
    BatchCompletionCallBack.completed[self.parallel] += 1
    print("done with {}".format(BatchCompletionCallBack.completed[self.parallel]))
    if self.parallel._original_iterator is not None:
      self.parallel.dispatch_next()

import joblib.parallel
joblib.parallel.BatchCompletionCallBack = BatchCompletionCallBack
于 2017-01-23T20:28:29.043 回答
5

TLDR 解决方案

使用 python 3.5 与 joblib 0.14.0 和 tq​​dm 4.46.0 一起使用。感谢 frenzykryger 提供 contextlib 建议,dano 和 Connor 提供猴子修补想法。

import contextlib
import joblib
from tqdm import tqdm
from joblib import Parallel, delayed

@contextlib.contextmanager
def tqdm_joblib(tqdm_object):
    """Context manager to patch joblib to report into tqdm progress bar given as argument"""

    def tqdm_print_progress(self):
        if self.n_completed_tasks > tqdm_object.n:
            n_completed = self.n_completed_tasks - tqdm_object.n
            tqdm_object.update(n=n_completed)

    original_print_progress = joblib.parallel.Parallel.print_progress
    joblib.parallel.Parallel.print_progress = tqdm_print_progress

    try:
        yield tqdm_object
    finally:
        joblib.parallel.Parallel.print_progress = original_print_progress
        tqdm_object.close()

您可以使用与 frenzykryger 描述的相同方式

import time
def some_method(wait_time):
    time.sleep(wait_time)

with tqdm_joblib(tqdm(desc="My method", total=10)) as progress_bar:
    Parallel(n_jobs=2)(delayed(some_method)(0.2) for i in range(10))

更长的解释:

Jon 的解决方案实现起来很简单,但它只测量分派的任务。如果任务需要很长时间,则在等待最后一个调度的任务完成执行时,条将卡在 100%。

frenzykryger 的上下文管理器方法,从 dano 和 Connor 改进而来,更好,但BatchCompletionCallBack也可以ImmediateResult在任务完成之前调用 with(参见joblib 的中间结果)。这将使我们的计数超过 100%。

BatchCompletionCallBack我们可以只修补print_progress. 无论如何ParallelBatchCompletionCallBack已经调用了这个。print_progress如果设置了详细(即Parallel(n_jobs=2, verbose=100)),print_progress则将打印出已完成的任务,尽管不如 tqdm 好。查看代码,这print_progress是一个类方法,所以它已经self.n_completed_tasks记录了我们想要的数字。我们所要做的只是将其与 joblib 的当前进度状态进行比较,并仅在存在差异时更新。

这是使用 python 3.5 在 joblib 0.14.0 和 tq​​dm 4.46.0 中测试的。

于 2020-05-08T22:32:26.563 回答
4

文字进度条

对于那些想要文本进度条而不需要像 tqdm 这样的附加模块的人来说,还有一种变体。Joblib=0.11 的实际值,Linux 上的 python 3.5.2 于 16.04.2018,并在子任务完成后显示进度。

重新定义原生类:

class BatchCompletionCallBack(object):
    # Added code - start
    global total_n_jobs
    # Added code - end
    def __init__(self, dispatch_timestamp, batch_size, parallel):
        self.dispatch_timestamp = dispatch_timestamp
        self.batch_size = batch_size
        self.parallel = parallel

    def __call__(self, out):
        self.parallel.n_completed_tasks += self.batch_size
        this_batch_duration = time.time() - self.dispatch_timestamp

        self.parallel._backend.batch_completed(self.batch_size,
                                           this_batch_duration)
        self.parallel.print_progress()
        # Added code - start
        progress = self.parallel.n_completed_tasks / total_n_jobs
        print(
            "\rProgress: [{0:50s}] {1:.1f}%".format('#' * int(progress * 50), progress*100)
            , end="", flush=True)
        if self.parallel.n_completed_tasks == total_n_jobs:
            print('\n')
        # Added code - end
        if self.parallel._original_iterator is not None:
            self.parallel.dispatch_next()

import joblib.parallel
import time
joblib.parallel.BatchCompletionCallBack = BatchCompletionCallBack

使用作业总数之前定义全局常量:

total_n_jobs = 10

这将导致如下结果:

Progress: [########################################          ] 80.0%
于 2018-04-16T13:13:25.780 回答
1

这是您的问题的另一个答案,语法如下:

aprun = ParallelExecutor(n_jobs=5)

a1 = aprun(total=25)(delayed(func)(i ** 2 + j) for i in range(5) for j in range(5))
a2 = aprun(total=16)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar='txt')(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar=None)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))

https://stackoverflow.com/a/40415477/232371

于 2016-11-04T05:35:42.080 回答
1

在 Jupyter 中,tqdm 每次输出时都会在输出中开始一个新行。所以对于 Jupyter Notebook,它将是:

用于 Jupyter 笔记本。不睡觉:

from joblib import Parallel, delayed
from datetime import datetime
from tqdm import notebook

def myfun(x):
    return x**2

results = Parallel(n_jobs=8)(delayed(myfun)(i) for i in notebook.tqdm(range(1000)))  

100% 1000/1000 [00:06<00:00, 143.70it/s]

随着时间.sleep:

from joblib import Parallel, delayed
from datetime import datetime
from tqdm import notebook
from random import randint
import time

def myfun(x):
    time.sleep(randint(1, 5))
    return x**2

results = Parallel(n_jobs=7)(delayed(myfun)(i) for i in notebook.tqdm(range(100)))

我目前使用的是什么而不是 joblib.Parallel:

import concurrent.futures
from tqdm import notebook
from random import randint
import time

iterable = [i for i in range(50)]

def myfun(x):
    time.sleep(randint(1, 5))
    return x**2

def run(func, iterable, max_workers=8):
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = list(notebook.tqdm(executor.map(func, iterable), total=len(iterable)))
    return results

run(myfun, iterable)
于 2019-05-31T10:21:26.103 回答