6

我最近编写了一个使用简单生产者/消费者模式的程序。它最初有一个与不正确使用 threading.Lock 相关的错误,我最终修复了该错误。但这让我思考是否有可能以无锁的方式实现生产者/消费者模式。

我的要求很简单:

  • 一个生产者线程。
  • 一个消费者线程。
  • 队列只能放置一项。
  • 生产者可以在当前项目被消耗之前生产下一个项目。因此,当前项目丢失了,但这对我来说没关系。
  • 消费者可以在生产下一个项目之前消费当前项目。因此,当前项目被消耗了两次(或更多),但这对我来说没关系。

所以我写了这个:

QUEUE_ITEM = None

# this is executed in one threading.Thread object
def producer():
    global QUEUE_ITEM
    while True:
        i = produce_item()
        QUEUE_ITEM = i

# this is executed in another threading.Thread object
def consumer():
    global QUEUE_ITEM
    while True:
        i = QUEUE_ITEM
        consume_item(i)

我的问题是:这段代码是线程安全的吗?

立即评论:这段代码并不是真正无锁的——我使用 CPython,它有 GIL。

我对代码进行了一些测试,它似乎可以工作。它转换为一些由于 GIL 而具有原子性的 LOAD 和 STORE 操作。但我也知道当 x 实现方法del x时操作不是原子的。__del__因此,如果我的项目有一个__del__方法并且发生了一些令人讨厌的调度,事情可能会中断。或不?

另一个问题是:我必须施加什么样的限制(例如对生产项目的类型)才能使上述代码正常工作?

我的问题只是关于利用 CPython 和 GIL 的怪癖以提出无锁(即没有像 threading.Lock 这样的锁在代码中显式)解决方案的理论可能性。

4

6 回答 6

6

诡计会咬你。只需使用队列在线程之间进行通信。

于 2009-05-12T21:50:51.433 回答
2

是的,这将按照您描述的方式工作:

  1. 生产者可能会产生一个可跳过的元素。
  2. 消费者可能会消费相同的元素。

但我也知道当 x 实现del方法时 del x 操作不是原子的。因此,如果我的项目有一个del方法并且发生了一些令人讨厌的调度,事情可能会中断。

我在这里看不到“del”。如果在 consumer_item 中发生了 del,那么del可能发生在生产者线程中。我不认为这会是一个“问题”。

不过不要打扰使用它。你最终会在毫无意义的轮询周期中耗尽 CPU,而且它并不比使用带锁的队列快,因为 Python 已经有一个全局锁。

于 2009-05-13T02:03:19.270 回答
1

这并不是真正的线程安全,因为生产者可以QUEUE_ITEM在消费者消费之前覆盖它并且消费者可以消费QUEUE_ITEM两次。正如你所提到的,你可以接受,但大多数人不是。

对 cpython 内部有更多了解的人将不得不回答你更多的理论问题。

于 2009-05-12T21:31:55.553 回答
0

我认为在生产/消费时线程可能会被中断,特别是如果项目是大对象。编辑:这只是一个疯狂的猜测。我不是专家。

此外,线程可以在另一个开始运行之前产生/消耗任意数量的项目。

于 2009-05-12T21:32:56.987 回答
0

只要您坚持追加/弹出,您就可以使用列表作为队列,因为两者都是原子的。

QUEUE = []

# this is executed in one threading.Thread object
def producer():
    global QUEUE
    while True:
        i = produce_item()
        QUEUE.append(i)

# this is executed in another threading.Thread object
def consumer():
    global QUEUE
    while True:
        try:
            i = QUEUE.pop(0)
        except IndexError:
            # queue is empty
            continue

        consume_item(i)

在像下面这样的类作用域中,您甚至可以清除队列。

class Atomic(object):
    def __init__(self):
        self.queue = []

    # this is executed in one threading.Thread object
    def producer(self):
        while True:
            i = produce_item()
            self.queue.append(i)

    # this is executed in another threading.Thread object
    def consumer(self):
        while True:
            try:
                i = self.queue.pop(0)
            except IndexError:
                # queue is empty
                continue

            consume_item(i)

    # There's the possibility producer is still working on it's current item.
    def clear_queue(self):
        self.queue = []

您必须通过查看生成的字节码找出哪些列表操作是原子的。

于 2009-05-12T21:34:05.080 回答
0

正如你所说,这__del__可能是一个问题。如果只有__del__在我们完成将新对象分配给QUEUE_ITEM. 我们需要类似的东西:

increase the reference counter on the old object
assign a new one to `QUEUE_ITEM`
decrease the reference counter on the old object

我很害怕,但我不知道这是否可能。

于 2009-05-12T22:07:49.723 回答