1

可以使用 joblib 对 python 中的函数进行多次调用。

from joblib import Parallel, delayed 

def normal(x):
    print "Normal", x
    return x**2

if  __name__ == '__main__':

    results = Parallel(n_jobs=2)(delayed(normal)(x) for x in range(20))
    print results

给出:[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]

但是,我真正想要的是在类实例列表上并行调用类函数。该函数只是存储一个类变量。然后稍后我将访问此变量。

from joblib import Parallel, delayed 

class A(object):
    def __init__(self, x):
        self.x = x
    def p(self):
        self.y = self.x**2

if  __name__ == '__main__':

    runs = [A(x) for x in range(20)]
    Parallel(n_jobs=4)(delayed(run.p() for run in runs))
    for run in runs:
        print run.y

这给出了一个错误:

回溯(最近一次通话最后):

文件“”,第 1 行,在 runfile('G:/My Drive/CODE/stackoverflow/parallel_classfunc/parallel_classfunc.py', wdir='G:/My Drive/CODE/stackoverflow/parallel_classfunc')

文件“C:\ProgramData\Anaconda2\lib\site-packages\spyder\utils\site\sitecustomize.py”,第 710 行,运行文件 execfile(文件名,命名空间)

文件“C:\ProgramData\Anaconda2\lib\site-packages\spyder\utils\site\sitecustomize.py”,第 86 行,在 execfile exec(compile(scripttext, filename, 'exec'), glob, loc)

文件“G:/My Drive/CODE/stackoverflow/parallel_classfunc/parallel_classfunc.py”,第 12 行,并行(n_jobs=4)(延迟(run.p() 用于在运行中运行))

文件“C:\ProgramData\Anaconda2\lib\site-packages\joblib\parallel.py”,第 183 行,延迟 pickle.dumps(function)

文件“C:\ProgramData\Anaconda2\lib\copy_reg.py”,第 70 行,在 _reduce_ex 中引发 TypeError,“can't pickle %s objects”% base。姓名

TypeError:无法腌制生成器对象

如何将 joblib 与这样的类一起使用?还是有更好的方法?

4

2 回答 2

4

怎么可能joblib与这样的类一起使用?

让我们首先提出一些代码抛光:

并非所有东西都适合joblib.Parallel()( delayed() )吞咽的呼号功能:

# >>> type( runs )                        <type 'list'>
# >>> type( runs[0] )                     <class '__main__.A'>
# >>> type( run.p() for run in runs )     <type 'generator'>

所以,让我们让 DEMO 对象“通过” aContainerFUN()

StackOverflow_DEMO_joblib.Parallel.py

from sklearn.externals.joblib import Parallel, delayed
import time

class A( object ):

    def __init__( self, x ):
        self.x = x
        self.y = "Defined on .__init__()"

    def p(        self ):
        self.y = self.x**2

def aNormalFUN( aValueOfX ):
    time.sleep( float( aValueOfX ) / 10. )
    print ": aNormalFUN() has got aValueOfX == {0:} to process.".format( aValueOfX )
    return aValueOfX * aValueOfX

def aContainerFUN( aPayloadOBJECT ):
    time.sleep( float( aPayloadOBJECT.x ) / 10. )
    # try: except: finally:
    pass;  aPayloadOBJECT.p()
    print  "| aContainerFUN: has got aPayloadOBJECT.id({0:}) to process. [ Has made .y == {1:}, given .x == {2: } ]".format( id( aPayloadOBJECT ), aPayloadOBJECT.y, aPayloadOBJECT.x )
    time.sleep( 1 )

if __name__ == '__main__':
     # ------------------------------------------------------------------
     results = Parallel( n_jobs = 2
                         )(       delayed( aNormalFUN )( aParameterX )
                         for                             aParameterX in range( 11, 21 )
                         )
     print results
     print '.'
     # ------------------------------------------------------------------
     pass;       runs = [ A( x ) for x in range( 11, 21 ) ]
     # >>> type( runs )                        <type 'list'>
     # >>> type( runs[0] )                     <class '__main__.A'>
     # >>> type( run.p() for run in runs )     <type 'generator'>

     Parallel( verbose = 10,
               n_jobs  = 2
               )(        delayed( aContainerFUN )( run )
               for                                 run in runs
               )

结果 ?像魅力一样工作!

C:\Python27.anaconda> python StackOverflow_DEMO_joblib.Parallel.py
: aNormalFUN() has got aValueOfX == 11 to process.
: aNormalFUN() has got aValueOfX == 12 to process.
: aNormalFUN() has got aValueOfX == 13 to process.
: aNormalFUN() has got aValueOfX == 14 to process.
: aNormalFUN() has got aValueOfX == 15 to process.
: aNormalFUN() has got aValueOfX == 16 to process.
: aNormalFUN() has got aValueOfX == 17 to process.
: aNormalFUN() has got aValueOfX == 18 to process.
: aNormalFUN() has got aValueOfX == 19 to process.
: aNormalFUN() has got aValueOfX == 20 to process.
[121, 144, 169, 196, 225, 256, 289, 324, 361, 400]
.
| aContainerFUN: has got aPayloadOBJECT.id(50369168) to process. [ Has made .y == 121, given .x ==  11 ]
| aContainerFUN: has got aPayloadOBJECT.id(50369168) to process. [ Has made .y == 144, given .x ==  12 ]
[Parallel(n_jobs=2)]: Done   1 tasks      | elapsed:    2.4s
| aContainerFUN: has got aPayloadOBJECT.id(12896752) to process. [ Has made .y == 169, given .x ==  13 ]
| aContainerFUN: has got aPayloadOBJECT.id(12896752) to process. [ Has made .y == 196, given .x ==  14 ]
[Parallel(n_jobs=2)]: Done   4 tasks      | elapsed:    4.9s
| aContainerFUN: has got aPayloadOBJECT.id(12856464) to process. [ Has made .y == 225, given .x ==  15 ]
| aContainerFUN: has got aPayloadOBJECT.id(12856464) to process. [ Has made .y == 256, given .x ==  16 ]
| aContainerFUN: has got aPayloadOBJECT.id(50368592) to process. [ Has made .y == 289, given .x ==  17 ]
| aContainerFUN: has got aPayloadOBJECT.id(50368592) to process. [ Has made .y == 324, given .x ==  18 ]
| aContainerFUN: has got aPayloadOBJECT.id(12856528) to process. [ Has made .y == 361, given .x ==  19 ]
| aContainerFUN: has got aPayloadOBJECT.id(12856528) to process. [ Has made .y == 400, given .x ==  20 ]
[Parallel(n_jobs=2)]: Done  10 out of  10 | elapsed:   13.3s finished
于 2018-06-05T15:57:19.553 回答
0

让第一个调整类 a/c 到第一个函数:

class A(object):
    def __init__(self, x):
        self.x = x
    def p(self):
        self.y = self.x**2
        return self.y

现在要并行运行上述类,只需使用 lambda 函数而不是直接调用它(run.p())。

from joblib import Parallel, delayed 

class A(object):
    def __init__(self, x):
        self.x = x
    def p(self):
        self.y = self.x**2
        return self.y

if  __name__ == '__main__':
    runs = [A(x) for x in range(20)]
    with Parallel(n_jobs=6, verbose=5) as parallel:
        delayed_funcs = [delayed(lambda x:x.p())(run) for run in runs]
        run_A = parallel(delayed_funcs)

    print(run_A)

您的输出如下所示:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]
[Parallel(n_jobs=6)]: Using backend LokyBackend with 6 concurrent workers.
[Parallel(n_jobs=6)]: Done   6 tasks      | elapsed:    0.0s
[Parallel(n_jobs=6)]: Done  14 out of  20 | elapsed:    0.0s remaining:    0.0s
[Parallel(n_jobs=6)]: Done  20 out of  20 | elapsed:    0.0s finished
于 2021-12-23T14:21:02.720 回答