1

尝试从 Cygwin 的多处理模块创建队列时出现奇怪的间歇性错误:

Traceback (most recent call last):
  File "test.py", line 3, in <module>
    multiprocessing.Queue()
  File "/usr/lib/python2.6/multiprocessing/__init__.py", line 213, in Queue
    return Queue(maxsize)
  File "/usr/lib/python2.6/multiprocessing/queues.py", line 37, in __init__
    self._rlock = Lock()
  File "/usr/lib/python2.6/multiprocessing/synchronize.py", line 117, in __init__
    SemLock.__init__(self, SEMAPHORE, 1, 1)
  File "/usr/lib/python2.6/multiprocessing/synchronize.py", line 49, in __init__
    sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
OSError: [Errno 17] File exists

重现这一点的最少代码只是:

import multiprocessing

multiprocessing.Queue()

虽然错误只发生在大约 25% 的时间里。

目前为了“解决”这个问题,我只有一个 while 循环,它不断创建Queues 直到错误没有出现,但我宁愿弄清楚它为什么会发生。我尝试查看记录的源文件,但即使一直追溯到 c 源也没有发现任何问题。

我在 Windows 7 64 位上运行来自 Cygwin 的 python 2.6.7。如果我通过本机 Windows python 从 cmd 运行它,即不是从 Cygwin 运行,则不会出现问题。

更新:更仔细地查看源代码,看起来 CreateSemaphore C 函数接受了“名称”参数,如果具有此名称的信号量已经存在,ERROR_ALREADY_EXISTS则会标记错误。但是,在 Modules/_multiprocessing/semaphore.c 中的 python 源代码中,该函数在没有名称参数的情况下调用,因此不应发生这种情况。我猜这只是 cygwin 信号量实现中的一个怪癖。

Edit2:我现在有这样的设置:

import multiprocessing

for i in range(10):
    count = 0
    while True:
        try:
            q = multiprocessing.Queue()
            break
        except OSError as exc:
            if exc.errno == 17:
                count += 1
            else:
                raise # catch other errors, but this has never happened
    print "iterations %d" % count

我注意到一个怪癖:Queue 构造函数失败的次数总是小于或等于 3003,而且恰好 3000 也经常出现。此外,一旦构造函数成功一次,在 for 循环的其余迭代中将不会再次失败。

我仍然很难过!我曾尝试在队列本身上使用 gc.collect、time.sleep、调用 close 或 del,但这些似乎都没有任何影响。如果这确实是操作系统清理信号量的问题,是否有办法通过系统调用“强制”这种情况发生?

4

1 回答 1

2

在 cygwin python 2.6.8中也有问题

$ python
Python 2.6.8 (unknown, Jun  9 2012, 11:30:32) 
[GCC 4.5.3] on cygwin

错误(下)是因为multiprocessing.futures使用 multiprocessing.queue. 无赖!我希望将它用作 gevent 的替代品(因为 gevent 使用猴子补丁)

$ ./eg3
Traceback (most recent call last):
  File "./eg3", line 13, in <module>
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
  File "/usr/lib/python2.6/site-packages/concurrent/futures/process.py", line 275, in __init__
    EXTRA_QUEUED_CALLS)
  File "/usr/lib/python2.6/site-packages/multiprocessing-2.6.2.1-py2.6-cygwin-1.7.16-i686.egg/multiprocessing/__init__.py", line 219, in Queue
    return Queue(maxsize)
  File "/usr/lib/python2.6/site-packages/multiprocessing-2.6.2.1-py2.6-cygwin-1.7.16-i686.egg/multiprocessing/queues.py", line 37, in __init__
    self._rlock = Lock()
  File "/usr/lib/python2.6/site-packages/multiprocessing-2.6.2.1-py2.6-cygwin-1.7.16-i686.egg/multiprocessing/synchronize.py", line 117, in __init__
    SemLock.__init__(self, SEMAPHORE, 1, 1)
  File "/usr/lib/python2.6/site-packages/multiprocessing-2.6.2.1-py2.6-cygwin-1.7.16-i686.egg/multiprocessing/synchronize.py", line 49, in __init__
    sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
OSError: [Errno 17] File exists
于 2012-11-30T21:31:38.277 回答