5

我正在尝试学习该模块作为pythonjoblib中内置模块的替代方案。multiprocessing我习惯于multiprocessing.imap在可迭代对象上运行函数并在结果进入时返回结果。在这个最小的工作示例中,我不知道如何使用 joblib:

import joblib, time

def hello(n):
    time.sleep(1)
    print "Inside function", n
    return n

with joblib.Parallel(n_jobs=1) as MP:

    func = joblib.delayed(hello)
    for x in MP(func(x) for x in range(3)):
        print "Outside function", x

哪个打印:

Inside function 0
Inside function 1
Inside function 2
Outside function 0
Outside function 1
Outside function 2

我想看看输出:

Inside function 0
Outside function 0
Inside function 1
Outside function 1
Inside function 2
Outside function 2

或类似的东西,表明迭代MP(...)没有等待所有结果完成。对于更长的演示更改n_jobs=-1range(100).

4

3 回答 3

6

stovfl 的回答很优雅,但它只适用于第一批发货。在示例中,它之所以有效,是因为工人永远不会挨饿(n_tasks < 2*n_jobs)。要使这种方法起作用,apply_async还必须调用最初传递给的回调。这是 的一个实例BatchCompletionCallBack,它安排下一批要处理的任务。

一种可能的解决方案是将任意回调包装在可调用对象中,如下所示(在 joblib==0.11,py36 中测试):

from joblib._parallel_backends import MultiprocessingBackend
from joblib import register_parallel_backend, parallel_backend
from joblib import Parallel, delayed
import time

class MultiCallback:
    def __init__(self, *callbacks):
        self.callbacks = [cb for cb in callbacks if cb]

    def __call__(self, out):
        for cb in self.callbacks:
            cb(out)

class ImmediateResultBackend(MultiprocessingBackend):
    def callback(self, result):
        print("\tImmediateResult function %s" % result)

    def apply_async(self, func, callback=None):
        cbs = MultiCallback(callback, self.callback)
        return super().apply_async(func, cbs)

register_parallel_backend('custom', ImmediateResultBackend)

def hello(n):
    time.sleep(1)
    print("Inside function", n)
    return n

with parallel_backend('custom'):
    res = Parallel(n_jobs=2)(delayed(hello)(y) for y in range(6))

输出

Inside function 0
Inside function 1
    ImmediateResult function [0]
    ImmediateResult function [1]
Inside function 3
Inside function 2
    ImmediateResult function [3]
    ImmediateResult function [2]
Inside function 4
    ImmediateResult function [4]
Inside function 5
    ImmediateResult function [5]
于 2018-09-15T18:16:23.080 回答
3

要从joblib获得即时结果,例如:

from joblib._parallel_backends import MultiprocessingBackend

class ImmediateResult_Backend(MultiprocessingBackend):
    def callback(self, result):
        print("\tImmediateResult function %s" % (result))

    # Overload apply_async and set callback=self.callback
    def apply_async(self, func, callback=None):
        applyResult = super().apply_async(func, self.callback)
        return applyResult

joblib.register_parallel_backend('custom', ImmediateResult_Backend, make_default=True)

with joblib.Parallel(n_jobs=2) as parallel:
    func = parallel(delayed(hello)(y) for y in range(3))
    for f in func:
        print("Outside function %s" % (f))

输出
注意:我使用time.sleep(n * random.randrange(1,5))in def hello(...),因此processes准备好不同。

内部函数 0
内部函数 1
ImmediateResult 函数 [0]
内部函数 2
ImmediateResult 函数 [1]
ImmediateResult 函数 [2]
外部函数 0
外部函数 1
外部函数 2

用 Python:3.4.2 测试 - joblib:0.11

于 2017-03-31T14:30:42.297 回答
-1
>>> import joblib, time
>>> 
>>> def hello(n):
...     time.sleep(1)
...     print "Inside function", n
...     return n
... 
>>> with joblib.Parallel(n_jobs=1) as MP:
...     func = joblib.delayed(hello)
...     res = MP(func(x) for x in range(3))  # This is not an iterator.
... 
Inside function 0
Inside function 1
Inside function 2
>>> type(res)
<type 'list'>

您正在处理的不是发电机。因此,您不应期望它会为您提供中间结果。我在文档中阅读的任何内容似乎都没有提及(或者我没有阅读相关部分)。

欢迎您阅读文档并搜索“中间”结果主题: https ://pythonhosted.org/joblib/search.html?q=intermediate&check_keywords=yes&area=default

我的理解是,每次调用parallel都是一个障碍,为了获得中间结果,您需要对处理进行分块:

>>> import joblib, time
>>> 
>>> def hello(n):
...     time.sleep(1)
...     print "Inside function", n
...     return n
... 
>>> with joblib.Parallel(n_jobs=1) as MP:
...     func = joblib.delayed(hello)
...     for chunk in range(3):
...         x = MP(func(y) for y in [chunk])
...         print "Outside function", x
... 
Inside function 0
Outside function [0]
Inside function 1
Outside function [1]
Inside function 2
Outside function [2]
>>> 

如果你想获得技术,有一个回调机制,但它专门用于进度报告(BatchCompletionCallBack),但你需要更多涉及的代码更改。

于 2017-03-29T05:30:24.063 回答