我会使用不同的方法:
fcond = threading.Condition()
fargs = set()
def f(x):
with fcond:
while x in fargs:
fcond.wait()
fargs.add(x) # this thread has exclusive rights to use `x`
# do useful stuff with x
# any other thread trying to call f(x) will
# block in the .wait above()
with fcond:
fargs.remove(x) # we're done with x
fcond.notify_all() # let blocked threads (if any) proceed
条件有一个学习曲线,但是一旦爬上去,它们就可以更容易地编写正确的线程安全、无竞争的代码。
原代码的线程安全
@JimMischel 在评论中询问原始的使用是否defaultdict
受种族影响。好问题!
答案是 - 唉 - “你将不得不盯着你的特定 Python 的实现”。
假设 CPython 实现:如果提供默认值的任何代码defaultdict
调用 Python 代码或释放 GIL(全局解释器锁)的 C 代码,则 2 个(或更多)线程可以“同时”调用withlock_dict[x]
相同的x
尚未在字典中,并且:
- 线程 1 看到它
x
不在字典中,获得锁,然后丢失它的时间片(在x
字典中设置之前)。
- 线程 2 看到它
x
不在字典中,并且还获得了锁。
- 其中一个线程的锁最终在字典中,但两个线程都执行
f(x)
。
盯着 3.4.0a4+(当前开发负责人)的源代码,defaultdict
并且threading.Lock
都是由不发布 GIL 的 C 代码实现的。我不记得早期版本是否在不同时间实现了全部或部分Pythondefaultdict
或threading.Lock
在 Python 中实现。
我建议的替代代码充满了用 Python 实现的东西(所有threading.Condition
方法),但在设计上是无竞争的——即使你使用的是旧版本的 Python,集合也在 Python 中实现(只有在保护下才能访问集合条件变量的锁)。
每个参数一个锁
如果没有条件,这似乎要困难得多。在最初的方法中,我相信您需要保留想要使用的线程计数x
,并且您需要一个锁来保护这些计数并保护字典。我为此想出的最好的代码是如此冗长,以至于将它放在上下文管理器中似乎是最明智的。要使用,请为每个需要它的函数创建一个参数储物柜:
farglocker = ArgLocker() # for function `f()`
然后f()
可以简单地对主体进行编码:
def f(x):
with farglocker(x):
# only one thread at a time can run with argument `x`
当然,条件方法也可以包装在上下文管理器中。这是代码:
import threading
class ArgLocker:
def __init__(self):
self.xs = dict() # maps x to (lock, count) pair
self.lock = threading.Lock()
def __call__(self, x):
return AllMine(self.xs, self.lock, x)
class AllMine:
def __init__(self, xs, lock, x):
self.xs = xs
self.lock = lock
self.x = x
def __enter__(self):
x = self.x
with self.lock:
xlock = self.xs.get(x)
if xlock is None:
xlock = threading.Lock()
xlock.acquire()
count = 0
else:
xlock, count = xlock
self.xs[x] = xlock, count + 1
if count: # x was already known - wait for it
xlock.acquire()
assert xlock.locked
def __exit__(self, *args):
x = self.x
with self.lock:
xlock, count = self.xs[x]
assert xlock.locked
assert count > 0
count -= 1
if count:
self.xs[x] = xlock, count
else:
del self.xs[x]
xlock.release()
那么哪种方式更好呢?使用条件 ;-) 这种方式“几乎显然是正确的”,但每个参数锁定(LPA)方法有点让人头疼。LPA 方法确实有一个优点,当一个线程完成后x
,唯一允许继续的线程是那些想要使用相同的线程x
。使用条件,唤醒所有阻塞等待任何参数的.notify_all()
线程。但是除非在尝试使用相同参数的线程之间存在非常激烈的争用,否则这并不重要:使用条件,唤醒的线程不会等待保持清醒,只要足够长的时间才能看到这是真的,然后立即再次阻止 ( )。x
x in fargs
.wait()