0

我一直在尝试在 main 调用整个函数时并行化整个函数,或者在没有运气的情况下在下面看到的函数的任何部分,似乎我无法摆脱TypeError: function object is not iterable. 感谢任何建议。

from joblib import Parallel, delayed
num_cores = multiprocessing.cpu_count()
parallel = Parallel(n_jobs=num_cores)
from multiprocessing import Pool
p = Pool(4)

def kmean(layerW,cluster):
    weights1d = np.reshape(layerW,-1)
    print(np.shape(weights1d))

    #Parallelizing Here
    centroids,_ = parallel(delayed(kmeans(weights1d, cluster)))
    idxs,_      = parallel(delayed(vq(weights1d,centroids)))

    #Here, using Parallel
    weights1d_q = parallel(delayed([centroids[idxs[i]] for i in range(len(weights1d))]))

    #OR --- using pool instead
    weights1d_q = p.map([centroids[idxs[i]] for i in range(len(weights1d))])
    weights4d_q  = np.reshape(weights1d_q, np.shape(layerW))
    return weights4d_q
4

1 回答 1

3

我无法逃脱TypeError: function object is not iterable

为了TypeError

TypeError由于错误的语法(对joblib.Parallel( delayed( ... ) ... )不遵守记录的调用语法构造函数的格式错误的调用),异常被正确抛出。

示例 1:正确调用:
此调用遵循文档中的语法规范,直至最后一个点:

>>> from joblib import Parallel, delayed
>>> parallel = Parallel( n_jobs = -1 )
>>> import numpy as np
>>> parallel( delayed( np.sqrt ) ( i**2 ) for i in range( 10 ) )
#          ^  ^^^^^^^     ^^^^     ^^^^   |||
#          |  |||||||     ||||     ||||   vvv
#JOBS(-1):-+  |||||||     ||||     ||||   |||
#DELAYED:-----+++++++     ||||     ||||   |||
#FUN( par ):--------------++++     ||||   |||
#     |||                          ||||   |||
#     +++-FUN(signature-"decl.")---++++   |||
#     ^^^                                 |||
#     |||                                 |||
#     +++-<<<-<iterator>-<<<-<<<-<<<-<<<--+++
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

并且生成的结果确认该调用完全合规且可解释。

示例 2:错误调用:

>>> from joblib import Parallel, delayed
>>> parallel = Parallel( n_jobs = -1 )
>>> import numpy as np
>>> parallel( delayed( np.sqrt( 10 ) ) )          #### THIS SLOC IS KNOWINGLY WRONG
#          ^  ^^^^^^^     ^^^^(????)  ????   ???  ####
#          |  |||||||     ||||        ||||   vvv  ####
#JOBS(-1):-+  |||||||     ||||        ||||   |||  ####
#DELAYED:-----+++++++     ||||        ||||   |||  #### DELAYED( <float64> )
#FUN( par ):--------------++++        ||||   |||  #### GOT NO CALLABLE FUN( par ) 
#     |||                             ||||   |||  ####        BUT A NUMBER
#     +++-FUN(signature-"decl.")------++++   |||  ####        FUN( signature )
#     ^^^                                    |||  ####        NOT PRESENT
#     |||                                    |||  ####        AND FEEDER
#     +++-<<<-<iterator>-<<<-<<<-<<<-<<<-<<<-+++  #### <ITERATOR> MISSING
#                                                 ####
Traceback (most recent call last):                ####   FOR DETAILS, READ THE O/P
  File "<stdin>", line 1, in <module>             ####   AND EXPLANATION BELOW
  File ".../lib/python3.5/site-packages/joblib/parallel.py", line 947, in __call__
    iterator = iter(iterable)
TypeError: 'function' object is not iterable

结果证实,O/P 使用的语法与记录的 QED不兼容joblib.Parallel( delayed(...) ... )


补救 :

遵循joblib.Parallel( delayed( ... ) ... )记录的语法:

#entroids, _ = parallel( delayed( kmeans(weights1d, cluster)))
#                                 ^^^^^^(..................)
#                                 ||||||(..................)
#THIS-IS-NOT-A-CALLABLE-BUT-VALUE-++++++(..................)
#
centroids, _ = parallel( delayed( kmeans ) ( weights1d, cluster ) for ... )
#                                 ^^^^^^     ^^^^^^^^^^^^^^^^^^   |||||||
#                                 ||||||     ||||||||||||||||||   vvvvvvv
# CALLABLE FUN()------------------++++++     ||||||||||||||||||   |||||||
#          FUN( <signature> )----------------++++++++++++++++++   |||||||
#               ^^^^^^^^^^^                                       |||||||
#               |||||||||||                                       |||||||
#               +++++++++++------------<<<--feeding-<iterator>----+++++++

最好的第一步:

是重新阅读有关如何joblib.Parallel设计和使用模式的文档详细信息,以便更好地熟悉该工具:

joblib.Parallel( n_jobs       = None,   # how many jobs will get instantiated
                 backend      = None,   # a method, how these will get instantiated
                 verbose      = 0,
                 timeout      = None,
                 pre_dispatch = '2 * n_jobs',
                 batch_size   = 'auto',
                 temp_folder  = None,
                 max_nbytes   = '1M',
                 mmap_mode    = 'r',
                 prefer       = None,   # None | { ‘processes’, ‘threads’ }
                 require      = None    # None | ‘sharedmem’ ~CONSTRAINTS backend
                 )

接下来,可以掌握一些简单的示例(并进行实验并将其扩展到自己的预期用例):

      Parallel(  n_jobs = 2 ) ( delayed( sqrt ) ( i ** 2 ) for i in range( 10 ) )
      #          ^              ^^^^^^^  ^^^^     ^^^^^^   |||
      #          |              |||||||  ||||     ||||||   vvv
      #JOBS:-----+              |||||||  ||||     ||||||   |||
      #DELAYED:-----------------+++++++  ||||     ||||||   |||
      #FUN( par ):-----------------------++++     ||||||   |||
      #     |||                                   ||||||   |||
      #     +++--FUN(-signature-"declaration"-)---++++++   |||
      #     ^^^                                            |||
      #     |||                                            |||
      #     +++-<<<-<iterator>-<<<-<<<-<<<-<<<-<<<-<<<-<<<-+++

      Parallel(  n_jobs = -1 ) ( 
                 delayed( myTupleConsumingFUN ) ( # aFun( aTuple = ( a, b, c, d ) )
                           aTupleOfParametersGeneratingFUN( i ) )
                 for                                        i in range( 10 )
                 )

下一条:尝试了解使用n_jobs实例化的成本和限制

默认后端将在孤立的Python 进程joblib中运行每个函数调用,因此它们不能改变主程序中定义的通用 Python 对象。但是,如果并行函数确实需要依赖线程的共享内存语义,则应该明确指出,从性能的角度来看,依赖共享内存语义可能不是最优的,因为并发访问共享Python 对象将遭受锁争用。

require='sharedmem'

使用基于线程的后端允许“共享”,但它意味着这样做的巨大成本 - 线程重新引入 GIL 步进,这将重新 - 将[SERIAL]代码执行流程重新转换为一个接一个 -以 GIL-lock-stepping 方式一个接一个。对于产生比原始纯代码更差的性能的计算密集型处理[SERIAL](虽然这种模式有助于延迟屏蔽用例,其中等待网络响应可能允许线程释放 GIL 锁并让其他线程离开前进并继续工作)

有一些步骤可以实现,以使单独的基于流程的计算能够传达这种需求,但是,这需要一些附加成本。

计算密集型问题必须平衡对最终性能的需求(使用更多内核),但要记住只有一个隔离(拆分)的工作单元和最小的参数传输和结果返回的附加成本,所有这些都可能与利用joblib.Parallel可用的公正[CONCURRENT]流程调度形式的错误设计意图相比,这很容易花费更多。

有关更多详细信息joblib.Parallel

有关附加成本和工作原子性对并行加速的影响的更多详细信息

于 2019-08-26T14:52:11.297 回答