0

我需要在 Python 中使用进程池。要求如下:

池的大小固定为 10。我有许多作业要提交到池(N > 10)。在 Java 中,可以为此目的使用 FixedThreadPool。提交作业,一旦线程完成执行任务,客户端就可以提交下一个任务。所以如果当前有 10 个任务在运行,客户端无法提交第 11 个任务。但是如果一个完成,客户端可以将下一个任务提交给可用线程。

这是我用来测试一些想法的代码:

import multiprocessing, time


def printStuff(number):
    print number
    if number % 2 : time.sleep(0.5)
    return number*number

pool = multiprocessing.Pool(5, None, None, None)   
a = []

def execute():
    def resultAggregator(n):
        print 'aggregator called...'
        a.append(n)
    for i in range (0, 34):

        # With callback
        #pool.apply_async(printStuff, [i], None, resultAggregator)
        #print "called for ", i

        # Without callback
        res = pool.apply_async(printStuff, [i])
        print "called for" , i, "returned ", res.get()

    pool.close() # disable sumitting any more tasks
    pool.join() # wait for all the worker to finish

execute()
print a

res.get() 阻塞直到printStuff返回。使用回调变体甚至不会调用printStuff. 注意在这两种情况下a最后都是空的。

任何想法如何实现上述行为?代码片段会很棒,但它足以指向我不知道的现有库函数,或者只是提出一些想法。

4

1 回答 1

2

我不了解 Java FixedThreadPool,但我可以修复您的代码 ;-)

你显然不想使用res.get(),对吧?所以我会忽略那部分。问题.apply_async()在于您没有正确调用它。我很惊讶没有提出任何例外!参数列表应该是一个元组,而不是一个列表(对于内置apply()函数)。对于关键字参数参数,None不起作用。如果您没有要传递的关键字参数,请将其保留(如下所示)或传递一个空字典({})。

此处的其他更改更具装饰性:引入了 IO 锁以防止终端输出被打乱,并引入了__name__ == "__main__"检查以确保代码清晰,以便代码也可以在 Windows 上运行:

import multiprocessing, time

def getlock(lck):
    global iolock
    iolock = lck

def printStuff(number):
    with iolock:
        print number
    if number % 2:
        time.sleep(0.5)
    return number*number

def execute():
    def resultAggregator(n):
        with iolock:
            print 'aggregator called...'
        a.append(n)

    for i in range(34):
        pool.apply_async(printStuff, (i,), callback=resultAggregator)
        with iolock:
            print "called for ", i

if __name__ == "__main__":
    a = []
    iolock = multiprocessing.Lock()
    pool = multiprocessing.Pool(5, getlock, (iolock,))   
    execute()
    pool.close()
    pool.join()
    print a

后来:错误

None事实证明,如果您传递关键字参数,实际上会引发异常- 但multiprocessing会抑制它。唉,这是异步噱头的一个常见问题:没有引发异常的好方法!它们发生在与您的“主程序”当时正在做什么无关的上下文中。

至少 Python 3.3.2 的实现也.apply_async()有一个可选error_callback参数。不知道什么时候介绍的。如果您提供它,异步异常将传递给它,因此您可以决定如何报告(或记录,或忽略......)它们。添加此功能:

def ouch(e):
    raise e

并将调用更改为:

pool.apply_async(printStuff, (i,), None, resultAggregator, ouch)

产生一个ouch()以这个异常细节结尾的回溯:

TypeError: printStuff() argument after ** must be a mapping, not NoneType

因此,至少,使用足够新的 Python,您可以安排不让异步错误无形地通过。

问答

您能解释一下 getLock() 中的“全局 iolock”声明吗?我认为它为每个子进程定义了一个全局变量,但是将名称从 iolock 更改为 iiolock in ? “main”使工作进程不知道 iolock。

抱歉,我无法从那确切地看出你做了什么。该名称iolock旨在成为所有进程(主要和子进程)中的全局名称。那是因为我的代码中的所有进程都使用name iolock

例如,如果“通过更改名称......”您的意思是您只是替换了

iolock = multiprocessing.Lock()

iiolock = multiprocessing.Lock()

那么你会有一个例外:

Traceback (most recent call last):
  ...
    pool = multiprocessing.Pool(5, getlock, (iolock,))
NameError: global name 'iolock' is not defined

如果您也更改了该行 ( pool = ...) 以使用,那么当您在主进程中尝试使用iiolock时,您会得到一个不同的异常:resultAggregatoriolock

Exception in thread Thread-3:
Traceback (most recent call last):
  ...
  File "mpool.py", line 19, in resultAggregator
    with iolock:
NameError: global name 'iolock' is not defined

所以我不知道你到底做了什么。

此外,在执行中声明 printStuff 会导致静默错误(代码在“要求”打印之后没有进展)

那是行不通的。Python 中的函数未声明 -def可执行语句。printStuff执行之前不存在的代码def printstuff。因为只有主程序执行,所以里面的execute()函数只存在于主程序中。这是真的defexecute()

    pool.apply_async(printStuff, (i,), callback=resultAggregator)

传递 printStuff给子进程,但所有传递的东西都通过发送端的酸洗和接收端的解酸来工作,并且函数对象不能被酸洗。你确定没有收到这样的错误吗?:

_pickle.PicklingError: Can't pickle <class 'function'>: attribute lookup builtins.function failed

(我在这里使用 Python 3 - 也许它在 Python 2 下有所不同)。

在任何情况下,Python 都不是 Java——不要因为嵌套而疯狂——保持简单 ;-) 子进程使用的每个函数和类都应该在模块级别定义(class也是 Python 中的可执行语句!唯一的Python 中的“声明”是globalandnonlocal语句)。

更多问答

你是对的假设。我在除 main 之外的所有地方都改为 iiolock

仍然不知道做了什么。对于这样的事情,你真的必须发布代码,而不仅仅是描述你做了什么。我只能猜测——这真的很痛苦 ;-) 这个怎么样:如果你有一个新问题,打开一个新问题?

根据您在此处描述的内容(“在除 main 之外的所有地方”)execute(),您iiolock在更改旧的)。但是你没有提到一个例外,所以我你并没有真正按照你所说的去做(“在除主要之外的所有地方”)。execute()iolockmain()

并期望新进程只是获得作为参数传递给初始化函数的相同锁,但每个都有自己的全局 iiolock 变量。无论如何,多个进程如何共享同一个变量(每个进程的内存内容不是不同吗???)。

有两个答案;-) 最直接相关的一个是iolock(在我的原始代码中 - 我真的不知道你的代码现在是什么样子)是一个由multiprocessing(它是一个mp.Lock())创建的对象,并通过mp.Pool()太传递给子进程:

pool = multiprocessing.Pool(5, getlock, (iolock,)) 
                                         ^^^^^^

mp控制这里的一切,并在幕后做一个世界性的事情,以确保它mp.Lock()在整个进程中具有一致的状态。它不仅仅是任何旧变量,它是mp众所周知的东西,它的所有行为都mp实现了。

第二个答案,到目前为止不适用于本期的任何代码,您还可以使用mp. 请参阅 和mp.Valuemp.Array文档multiprocessing.sharedctypes。这些值是真正(物理上)跨进程共享的。

但除了那些(由 实现 mp对象和从 获得的“共享内存” mp)之外,您是对的:没有其他值在进程之间共享(无论是物理上的还是语义上的)。mp通信所有其他类型的对象值是通过在不同的同步点(比如当你.put()在一个对象mp.Queue和另一个进程上)进行酸洗(序列化)和取消酸洗(从泡菜字符串重建对象的值)来完成的.get()

于 2013-10-11T19:06:45.467 回答