6

我有一个函数,永远不能从两个线程同时调用相同的值。为了强制执行这一点,我有一个为给定键defaultdict生成 newthreading.Lock的 s。因此,我的代码看起来类似于:

from collections import defaultdict
import threading

lock_dict = defaultdict(threading.Lock)
def f(x):
    with lock_dict[x]:
        print "Locked for value x"

问题是,一旦不再需要,我无法弄清楚如何安全地从 defaultdict 中删除锁。如果不这样做,我的程序会出现内存泄漏,当f使用许多不同的 x 值调用时会变得很明显。

我不能简单地del lock_dict[x]在 f 结束时,因为在另一个线程正在等待锁的情况下,第二个线程将锁定一个不再与 lock_dict[x] 关联的锁,因此两个线程可能最终同时f调用x 的值相同。

4

1 回答 1

7

我会使用不同的方法:

fcond = threading.Condition()
fargs = set()

def f(x):
    with fcond:
        while x in fargs:
            fcond.wait()
        fargs.add(x)  # this thread has exclusive rights to use `x`

    # do useful stuff with x
    # any other thread trying to call f(x) will
    # block in the .wait above()

    with fcond:
        fargs.remove(x)      # we're done with x
        fcond.notify_all()   # let blocked threads (if any) proceed

条件有一个学习曲线,但是一旦爬上去,它们就可以更容易地编写正确的线程安全、无竞争的代码。

原代码的线程安全

@JimMischel 在评论中询问原始的使用是否defaultdict受种族影响。好问题!

答案是 - 唉 - “你将不得不盯着你的特定 Python 的实现”。

假设 CPython 实现:如果提供默认值的任何代码defaultdict调用 Python 代码或释放 GIL(全局解释器锁)的 C 代码,则 2 个(或更多)线程可以“同时”调用withlock_dict[x]相同的x尚未在字典中,并且:

  1. 线程 1 看到它x不在字典中,获得锁,然后丢失它的时间片(在x字典中设置之前)。
  2. 线程 2 看到它x不在字典中,并且还获得了锁。
  3. 其中一个线程的锁最终在字典中,但两个线程都执行f(x)

盯着 3.4.0a4+(当前开发负责人)的源代码,defaultdict并且threading.Lock都是由不发布 GIL 的 C 代码实现的。我不记得早期版本是否在不同时间实现了全部或部分Pythondefaultdictthreading.Lock在 Python 中实现。

我建议的替代代码充满了用 Python 实现的东西(所有threading.Condition方法),但在设计上是无竞争的——即使你使用的是旧版本的 Python,集合也在 Python 中实现(只有在保护下才能访问集合条件变量的锁)。

每个参数一个锁

如果没有条件,这似乎要困难得多。在最初的方法中,我相信您需要保留想要使用的线程计数x,并且您需要一个锁来保护这些计数并保护字典。我为此想出的最好的代码是如此冗长,以至于将它放在上下文管理器中似乎是最明智的。要使用,请为每个需要它的函数创建一个参数储物柜:

farglocker = ArgLocker() # for function `f()`

然后f()可以简单地对主体进行编码:

def f(x):
    with farglocker(x):
        # only one thread at a time can run with argument `x`

当然,条件方法也可以包装在上下文管理器中。这是代码:

import threading

class ArgLocker:
    def __init__(self):
        self.xs = dict() # maps x to (lock, count) pair
        self.lock = threading.Lock()

    def __call__(self, x):
        return AllMine(self.xs, self.lock, x)

class AllMine:
    def __init__(self, xs, lock, x):
        self.xs = xs
        self.lock = lock
        self.x = x

    def __enter__(self):
        x = self.x
        with self.lock:
            xlock = self.xs.get(x)
            if xlock is None:
                xlock = threading.Lock()
                xlock.acquire()
                count = 0
            else:
                xlock, count = xlock
            self.xs[x] = xlock, count + 1

        if count: # x was already known - wait for it
            xlock.acquire()
        assert xlock.locked

    def __exit__(self, *args):
        x = self.x
        with self.lock:
            xlock, count = self.xs[x]
            assert xlock.locked
            assert count > 0
            count -= 1
            if count:
                self.xs[x] = xlock, count
            else:
                del self.xs[x]
            xlock.release()

那么哪种方式更好呢?使用条件 ;-) 这种方式“几乎显然是正确的”,但每个参数锁定(LPA)方法有点让人头疼。LPA 方法确实有一个优点,当一个线程完成后x唯一允许继续的线程是那些想要使用相同的线程x。使用条件,唤醒所有阻塞等待任何参数的.notify_all()线程。但是除非在尝试使用相同参数的线程之间存在非常激烈的争用,否则这并不重要:使用条件,唤醒的线程不会等待保持清醒,只要足够长的时间才能看到这是真的,然后立即再次阻止 ( )。xx in fargs.wait()

于 2013-11-06T05:28:36.260 回答