7

我正在试验 Python 2.6 中的新多处理模块。我正在创建几个进程,每个进程都有自己的 multiprocessor.JoinableQueue 实例。每个进程产生一个或多个工作线程(threading.Thread 的子类),它们共享 JoinableQueue 实例(通过每个线程的__init__方法传入)。它似乎通常可以工作,但偶尔会出现无法预料的失败并出现以下错误:

  File "C:\Documents and Settings\Brian\Desktop\testscript.py", line 49, in run
    self.queue.task_done()
  File "C:\Python26\lib\multiprocessing\queues.py", line 293, in task_done
    raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times

我的 Queue get() 和 task_done() 调用紧随其后,因此它们应该相等。有趣的是,这似乎只有在 get() 和 task_done() 之间完成的工作非常快时才会发生。插入一个小的time.sleep(0.01)似乎可以缓解这个问题。

有什么想法吗?我可以使用带线程的多处理器队列而不是更传统的(Queue.Queue)吗?

谢谢!

-布赖恩

4

4 回答 4

4

我还没有在 2.6 中尝试过多处理,但是我玩了很多 pyprocessing(因为它在 2.5 中被调用)。

我可以看到您正在寻找多个进程,每个进程分别产生一组线程。

由于您使用的是多处理模块,因此我建议您使用多进程而不是多线程方法,这样您会遇到更少的问题,例如死锁等。

创建一个队列对象。http://pyprocessing.berlios.de/doc/queue-objects.html

要创建多进程环境,请使用池:http ://pyprocessing.berlios.de/doc/pool-objects.html它将为您管理工作进程。然后,您可以将异步/同步应用到工作人员,如果需要,还可以为每个工作人员添加回调。但请记住回调是一个常见的代码块,它应该立即返回(如文档中所述)

一些附加信息:如果需要,创建一个管理器http://pyprocessing.berlios.de/doc/manager-objects.html来管理对队列对象的访问。您必须为此共享队列对象。但好处是,一旦共享和管理,您可以通过创建代理对象在整个网络上访问这个共享队列。这将使您能够将集中式共享队列对象的方法调用为(显然)任何网络节点上的本机方法。

这是文档中的代码示例

可以在一台机器上运行管理服务器并让客户端从其他机器使用它(假设所涉及的防火墙允许它)。运行以下命令为远程客户端可以使用的共享队列创建服务器:

>>> from processing.managers import BaseManager, CreatorMethod
>>> import Queue
>>> queue = Queue.Queue()
>>> class QueueManager(BaseManager):
...     get_proxy = CreatorMethod(callable=lambda:queue, typeid='get_proxy')
...
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='none')
>>> m.serve_forever()

一个客户端可以访问服务器,如下所示:

>>> from processing.managers import BaseManager, CreatorMethod
>>> class QueueManager(BaseManager):
...     get_proxy = CreatorMethod(typeid='get_proxy')
...
>>> m = QueueManager.from_address(address=('foo.bar.org', 50000), authkey='none')
>>> queue = m.get_proxy()
>>> queue.put('hello')

如果您坚持使用安全线程的东西,PEP371(多处理)引用此http://code.google.com/p/python-safethread/

于 2008-12-05T02:08:18.530 回答
2

您应该将 Queue 对象作为目标的参数传递。

多处理文档中的示例:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

 if __name__ == '__main__':
     q = Queue()
     p = Process(target=f, args=(q,))
     p.start()
     print q.get()    # prints "[42, None, 'hello']"
     p.join()

队列是线程和进程安全的。

于 2008-12-05T00:59:29.853 回答
1

你可能会遇到这个错误:

http://bugs.python.org/issue4660

于 2010-02-07T19:43:37.820 回答
-1

感谢您及时的回复。正如您所说明的,我将 multiprocessing.Queue 实例作为参数传递给每个 Process 。故障似乎发生在线程中。我通过子类化 threading.Thread 并将队列传递给每个线程实例的“ init ”方法来创建它们。这似乎是将队列传递给线程子类的公认方式。我唯一认为多处理队列可能与线程不兼容(尽管它们应该是线程安全的)。

于 2008-12-05T01:12:12.683 回答