我不了解 Java FixedThreadPool
,但我可以修复您的代码 ;-)
你显然不想使用res.get()
,对吧?所以我会忽略那部分。问题.apply_async()
在于您没有正确调用它。我很惊讶没有提出任何例外!参数列表应该是一个元组,而不是一个列表(对于内置apply()
函数)。对于关键字参数参数,None
不起作用。如果您没有要传递的关键字参数,请将其保留(如下所示)或传递一个空字典({}
)。
此处的其他更改更具装饰性:引入了 IO 锁以防止终端输出被打乱,并引入了__name__ == "__main__"
检查以确保代码清晰,以便代码也可以在 Windows 上运行:
import multiprocessing, time
def getlock(lck):
global iolock
iolock = lck
def printStuff(number):
with iolock:
print number
if number % 2:
time.sleep(0.5)
return number*number
def execute():
def resultAggregator(n):
with iolock:
print 'aggregator called...'
a.append(n)
for i in range(34):
pool.apply_async(printStuff, (i,), callback=resultAggregator)
with iolock:
print "called for ", i
if __name__ == "__main__":
a = []
iolock = multiprocessing.Lock()
pool = multiprocessing.Pool(5, getlock, (iolock,))
execute()
pool.close()
pool.join()
print a
后来:错误
None
事实证明,如果您传递关键字参数,实际上会引发异常- 但multiprocessing
会抑制它。唉,这是异步噱头的一个常见问题:没有引发异常的好方法!它们发生在与您的“主程序”当时正在做什么无关的上下文中。
至少 Python 3.3.2 的实现也.apply_async()
有一个可选error_callback
参数。不知道什么时候介绍的。如果您提供它,异步异常将传递给它,因此您可以决定如何报告(或记录,或忽略......)它们。添加此功能:
def ouch(e):
raise e
并将调用更改为:
pool.apply_async(printStuff, (i,), None, resultAggregator, ouch)
产生一个ouch()
以这个异常细节结尾的回溯:
TypeError: printStuff() argument after ** must be a mapping, not NoneType
因此,至少,使用足够新的 Python,您可以安排不让异步错误无形地通过。
问答
您能解释一下 getLock() 中的“全局 iolock”声明吗?我认为它为每个子进程定义了一个全局变量,但是将名称从 iolock 更改为 iiolock in ? “main”使工作进程不知道 iolock。
抱歉,我无法从那确切地看出你做了什么。该名称iolock
旨在成为所有进程(主要和子进程)中的全局名称。那是因为我的代码中的所有进程都使用name iolock
。
例如,如果“通过更改名称......”您的意思是您只是替换了
iolock = multiprocessing.Lock()
至
iiolock = multiprocessing.Lock()
那么你会有一个例外:
Traceback (most recent call last):
...
pool = multiprocessing.Pool(5, getlock, (iolock,))
NameError: global name 'iolock' is not defined
如果您也更改了该行 ( pool = ...
) 以使用,那么当您在主进程中尝试使用iiolock
时,您会得到一个不同的异常:resultAggregator
iolock
Exception in thread Thread-3:
Traceback (most recent call last):
...
File "mpool.py", line 19, in resultAggregator
with iolock:
NameError: global name 'iolock' is not defined
所以我不知道你到底做了什么。
此外,在执行中声明 printStuff 会导致静默错误(代码在“要求”打印之后没有进展)
那是行不通的。Python 中的函数未声明 -def
是可执行语句。printStuff
执行之前不存在的代码def printstuff
。因为只有主程序执行,所以里面的execute()
函数只存在于主程序中。这是真的def
execute()
pool.apply_async(printStuff, (i,), callback=resultAggregator)
传递 printStuff
给子进程,但所有传递的东西都通过发送端的酸洗和接收端的解酸来工作,并且函数对象不能被酸洗。你确定没有收到这样的错误吗?:
_pickle.PicklingError: Can't pickle <class 'function'>: attribute lookup builtins.function failed
(我在这里使用 Python 3 - 也许它在 Python 2 下有所不同)。
在任何情况下,Python 都不是 Java——不要因为嵌套而疯狂——保持简单 ;-) 子进程使用的每个函数和类都应该在模块级别定义(class
也是 Python 中的可执行语句!唯一的Python 中的“声明”是global
andnonlocal
语句)。
更多问答
你是对的假设。我在除 main 之外的所有地方都改为 iiolock
仍然不知道你做了什么。对于这样的事情,你真的必须发布代码,而不仅仅是描述你做了什么。我只能猜测——这真的很痛苦 ;-) 这个怎么样:如果你有一个新问题,打开一个新问题?
根据您在此处描述的内容(“在除 main 之外的所有地方”)execute()
,您会iiolock
在更改旧的)。但是你没有提到一个例外,所以我猜你并没有真正按照你所说的去做(“在除主要之外的所有地方”)。execute()
iolock
main()
并期望新进程只是获得作为参数传递给初始化函数的相同锁,但每个都有自己的全局 iiolock 变量。无论如何,多个进程如何共享同一个变量(每个进程的内存内容不是不同吗???)。
有两个答案;-) 最直接相关的一个是iolock
(在我的原始代码中 - 我真的不知道你的代码现在是什么样子)是一个由multiprocessing
(它是一个mp.Lock()
)创建的对象,并通过mp.Pool()
太传递给子进程:
pool = multiprocessing.Pool(5, getlock, (iolock,))
^^^^^^
mp
控制这里的一切,并在幕后做一个世界性的事情,以确保它mp.Lock()
在整个进程中具有一致的状态。它不仅仅是任何旧变量,它是mp
众所周知的东西,它的所有行为都mp
实现了。
第二个答案,到目前为止不适用于本期的任何代码,您还可以使用mp
. 请参阅 和mp.Value
的mp.Array
文档multiprocessing.sharedctypes
。这些值是真正(物理上)跨进程共享的。
但除了那些(由 实现的 mp
对象和从 获得的“共享内存” mp
)之外,您是对的:没有其他值在进程之间共享(无论是物理上的还是语义上的)。mp
通信所有其他类型的对象值是通过在不同的同步点(比如当你.put()
在一个对象mp.Queue
和另一个进程上)进行酸洗(序列化)和取消酸洗(从泡菜字符串重建对象的值)来完成的.get()
。