您链接到的文档状态Parallel
具有可选的进度表。它是通过使用提供的callback
关键字参数实现的multiprocessing.Pool.apply_async
:
# This is inside a dispatch function
self._lock.acquire()
job = self._pool.apply_async(SafeFunction(func), args,
kwargs, callback=CallBack(self.n_dispatched, self))
self._jobs.append(job)
self.n_dispatched += 1
...
class CallBack(object):
""" Callback used by parallel: it is used for progress reporting, and
to add data to be processed
"""
def __init__(self, index, parallel):
self.parallel = parallel
self.index = index
def __call__(self, out):
self.parallel.print_progress(self.index)
if self.parallel._original_iterable:
self.parallel.dispatch_next()
这是print_progress
:
def print_progress(self, index):
elapsed_time = time.time() - self._start_time
# This is heuristic code to print only 'verbose' times a messages
# The challenge is that we may not know the queue length
if self._original_iterable:
if _verbosity_filter(index, self.verbose):
return
self._print('Done %3i jobs | elapsed: %s',
(index + 1,
short_format_time(elapsed_time),
))
else:
# We are finished dispatching
queue_length = self.n_dispatched
# We always display the first loop
if not index == 0:
# Display depending on the number of remaining items
# A message as soon as we finish dispatching, cursor is 0
cursor = (queue_length - index + 1
- self._pre_dispatch_amount)
frequency = (queue_length // self.verbose) + 1
is_last_item = (index + 1 == queue_length)
if (is_last_item or cursor % frequency):
return
remaining_time = (elapsed_time / (index + 1) *
(self.n_dispatched - index - 1.))
self._print('Done %3i out of %3i | elapsed: %s remaining: %s',
(index + 1,
queue_length,
short_format_time(elapsed_time),
short_format_time(remaining_time),
))
老实说,他们实现这一点的方式有点奇怪——它似乎假设任务总是按照它们开始的顺序完成。index
转到的变量print_progress
只是self.n_dispatched
作业实际开始时的变量。因此,启动的第一个作业总是以index
0 结束,即使说第三个作业首先完成。这也意味着他们实际上并没有跟踪已完成工作的数量。所以没有实例变量供您监控。
我认为最好的方法是制作自己的 CallBack 类和猴子补丁 Parallel:
from math import sqrt
from collections import defaultdict
from joblib import Parallel, delayed
class CallBack(object):
completed = defaultdict(int)
def __init__(self, index, parallel):
self.index = index
self.parallel = parallel
def __call__(self, index):
CallBack.completed[self.parallel] += 1
print("done with {}".format(CallBack.completed[self.parallel]))
if self.parallel._original_iterable:
self.parallel.dispatch_next()
import joblib.parallel
joblib.parallel.CallBack = CallBack
if __name__ == "__main__":
print(Parallel(n_jobs=2)(delayed(sqrt)(i**2) for i in range(10)))
输出:
done with 1
done with 2
done with 3
done with 4
done with 5
done with 6
done with 7
done with 8
done with 9
done with 10
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
这样,每当作业完成时,您的回调就会被调用,而不是默认的。