0

我有一组受 CPU 限制的进程,只要它们唯一的同步是将作业从队列中取出,它们就可以将任意数量的内核利用率提高到 100%。

当我添加一个 RLock 以避免在更新文件系统中的目录时出现最坏的情况时,CPU/核心利用率下降到 60%,就好像进程已经成为 IO 绑定一样。

有什么解释?

这与整体速度无关。它与 CPU/核心利用率有关,因此 Python 2/3、Cython 或 PyPy 应该无关紧要。

更新:我对自己的问题给出了部分答案。我的特殊情况的最终解决方案包括修改文件系统的访问方式,因此不需要同步(“某种”映射/减少)。

4

2 回答 2

1

这一切都取决于如何multiprocessing实施RLock。我知道多处理可以跨主机工作,这意味着同步原语可以跨套接字工作。如果这是真的,它会引入很多(可变)延迟。

所以我做了一个实验。

RLock这是一个被多个进程使用的例子(以防止所有锁都在同一个进程中的任何快速路径):

#!/usr/bin/env python
import multiprocessing
from time import sleep

lock = multiprocessing.RLock()

def noop(myname):
    # nonlocal lock
    sleep(0.5)
    print myname, "acquiring lock"
    with lock:
        print myname, "has lock"
        sleep(0.5)
    print myname, "released lock"

sProc1 = multiprocessing.Process(target=noop, args=('alice',))
sProc2 = multiprocessing.Process(target=noop, args=('bob',))

sProc1.start()
sProc2.start()

sProc1.join()
sProc2.join()

当它运行时,它的输出看起来像这样:

alice acquiring lock
alice has lock
bob acquiring lock
alice released lock
bob has lock
bob released lock

太好了,现在通过strace使用系统调用跟踪运行它。

在下面的命令中,该-ff选项告诉工具“跟踪fork()”调用,即跟踪由主进程启动的任何进程。为简洁起见,我也在使用-e trace=futex,write,它根据我在发布之前做出的结论过滤输出。通常你会在没有-e选项的情况下运行并使用文本编辑器 /grep来探索事后发生的事情。

# strace -ff -e trace=futex,write ./traceme.py
futex(0x7fffeafe29bc, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 1, NULL, 7fb92ac6c700) = -1 EAGAIN (Resource temporarily unavailable)
futex(0x7fb92a8540b0, FUTEX_WAKE_PRIVATE, 2147483647) = 0
futex(0x7fb92aa7131c, FUTEX_WAKE_PRIVATE, 2147483647) = 0
write(3, "\1\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 32) = 32
Process 25873 attached
Process 25874 attached
Process 25872 suspended
[pid 25873] write(1, "alice acquiring lock\n", 21alice acquiring lock
) = 21
[pid 25873] write(1, "alice has lock\n", 15alice has lock
) = 15
[pid 25874] write(1, "bob acquiring lock\n", 19bob acquiring lock
) = 19
[pid 25874] futex(0x7fb92ac91000, FUTEX_WAIT, 0, NULL <unfinished ...>
[pid 25873] futex(0x7fb92ac91000, FUTEX_WAKE, 1 <unfinished ...>
[pid 25874] <... futex resumed> )       = 0
[pid 25873] <... futex resumed> )       = 1
[pid 25874] write(1, "bob has lock\n", 13 <unfinished ...>
bob has lock
[pid 25873] write(1, "alice released lock\n", 20 <unfinished ...>
alice released lock
[pid 25874] <... write resumed> )       = 13
[pid 25873] <... write resumed> )       = 20
Process 25872 resumed
Process 25873 detached
[pid 25872] --- SIGCHLD (Child exited) @ 0 (0) ---
Process 25872 suspended
[pid 25874] write(1, "bob released lock\n", 18bob released lock
) = 18
Process 25872 resumed
Process 25874 detached
--- SIGCHLD (Child exited) @ 0 (0) ---

从 print ( write()) 消息和调用哪个阻塞和稍后恢复的模式来看,使用或“Fast Userspace Mutex”实现futex似乎很清楚。顾名思义,这是同步的不错选择。RLockfutex

当一个进程在系统调用futex中被阻塞时,例如该进程出于所有意图和目的而阻塞 I/O。

所有这一切都意味着它multiprocessing.RLock是高效的,并且按照它的设计目的去做。因此,如果您的应用程序在使用同步时的性能低于您的预期,那么您的算法很可能是罪魁祸首。

于 2013-03-09T15:32:53.583 回答
0

测量表明它RLock不受 I/O 限制。尽管与RLock(or Semaphore) 同步的代码似乎比不同步的代码慢 6 倍,但执行最简单 I/O 的代码要慢几个数量级。

下面是我用来测量RLockPython 和 PyPy 开销的单进程代码。我仍然不明白为什么RLock单个进程的开销如此之高,或者为什么开销不能维持 CPU/核心利用率,但结果表明使用标准同步原语比尝试自己滚动更有效.

# lock.py
import sys
import os
import timeit
from random import random
from multiprocessing import RLock, Semaphore

N=8*1024*1024

lock = RLock()
semaphore = Semaphore()

def touch(fname, times=None):
    if not os.path.isfile(fname):
        open(fname, 'wa').close()
    else:
        with file(fname, 'a'):
            os.utime(fname, times)

def wolock():
    return random()

def wlock():
    with lock:
        return random()

def wsem():
    with semaphore:
        return random()

def wfile():
   os.path.isfile('lock')
   touch('lock') 
   try:
        return random()
   finally:
        os.unlink('lock')

def run(get):
    result = []
    for i in xrange(N):
        result.append(get())
    return result

def main():
    t0 = timeit.timeit('lock.wolock()', setup='import lock', number=N)
    print '%8.3f %8.2f%% %s' % (t0, 100, 'unsynchronized')
    t1 = timeit.timeit('lock.wlock()', setup='import lock', number=N)
    print '%8.3f %8.2f%% %s' % (t1, 100*t1/t0, 'rlock')
    t2 = timeit.timeit('lock.wsem()', setup='import lock', number=N)
    print '%8.3f %8.2f%% %s' % (t2, 100*t2/t0, 'semaphore')
    t = timeit.timeit('lock.wfile()', setup='import lock', number=N)
    print '%8.3f %s' % (t, 'file system')

if __name__ == '__main__':
    main()
于 2013-03-09T13:47:54.197 回答