10

我试图在带有池的 python 中使用多处理包。

我有由 map_async 函数调用的函数 f:

from multiprocessing import Pool

def f(host, x):
    print host
    print x

hosts = ['1.1.1.1', '2.2.2.2']
pool = Pool(processes=5)
pool.map_async(f,hosts,"test")
pool.close()
pool.join()

此代码有下一个错误:

Traceback (most recent call last):
  File "pool-test.py", line 9, in <module>
    pool.map_async(f,hosts,"test")
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 290, in map_async
    result = MapResult(self._cache, chunksize, len(iterable), callback)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 557, in __init__
    self._number_left = length//chunksize + bool(length % chunksize)
TypeError: unsupported operand type(s) for //: 'int' and 'str'

我不知道如何将超过 1 个参数传递给 f 函数。有什么办法吗?

4

3 回答 3

13

"test"被解释为map_async'schunksize关键字参数(请参阅文档)。

您的代码可能应该是(这里从我的 IPython 会话中复制粘贴):

from multiprocessing import Pool

def f(arg):
    host, x = arg
    print host
    print x

hosts = ['1.1.1.1', '2.2.2.2']
args = ((host, "test") for host in hosts)
pool = Pool(processes=5)
pool.map_async(f, args)
pool.close()
pool.join()
## -- End pasted text --

1.1.1.1
test
2.2.2.2
test

注意:在 Python 3 中,您可以使用starmap,它将从元组中解包参数。你将能够避免host, x = arg明确地做。

于 2013-05-14T11:46:56.620 回答
6

Pool 在 Python 3 中返回一个上下文管理器,因此可以使用 with 语句。这避免了异常问题并且意味着没有必要关闭和加入。在这种情况下,函数总是接收 x 变量的常量,因此可以通过部分评估来处理。map_async 是惰性的,因此我们需要获取结果以执行操作,不妨使用 map。因此:

from multiprocessing import Pool
from functools import partial

def f(host, x):
    print(host)
    print(x)

hosts = ('1.1.1.1', '2.2.2.2')
with Pool(processes=5) as pool:
    pool.map(partial(f, x='test'), hosts)

结果是:

1.1.1.1
测试
2.2.2.2
测试
于 2015-05-17T12:44:21.027 回答
1

我记得, Pool().map() 和 .map_async() 特别只接受一个参数。这个限制可以通过传递一个列表来解决,但是当然你需要一个定制的函数来接受一个列表(like)对象作为参数。

一种方法是编写一次自定义代码——也就是一个通用的“函数+参数”包装器。我做了这样的事情(注意:这只是部分测试):

def tmp_test():
    # a short test script:
    #
    A=[[1,2], [2,3], [4,5], [6,7]]
    P=mpp.Pool(mpp.cpu_count())
    X=P.map_async(map_helper, [[operator.eq]+a for a in A])
    #
    return X.get()


def null_funct(args=[], kwargs={}):
    # a place-holder 
    pass
#
def map_helper(args_in = [null_funct, [], {}]):
    # helper function for pool.map_async(). pass data as a list(-like object):
    # [function, [args], {kwargs}] (though we'll allow for some mistakes).
    #
    funct = args_in[0]
    #
    # allow for different formatting options:
    if not (isinstance(args_in[1], list) or isinstance(args_in[1], tuple) or isinstance(args_in[1], dict)):
        # probably passed a list of parameters. just use them:
        args = args_in[1:]
        #
        return funct(*args)
    #
    # if the args are "properly" formatted:
    args=[]
    kwargs = {}
    for arg in args_in[1:]:
        # assign list types to args, dict types to kwargs...
        if isinstance(arg, list) or isinstance(arg, tuple): args += arg
        if isinstance(arg, dict): kwargs.update(arg)
    return funct(*args, **kwargs)
于 2014-09-18T18:44:31.407 回答