我在 python 中使用队列,它的工作原理类似于通道。也就是说,每当您进行插入时,其他一些线程正在等待并获取插入的值。这个值就是产量。
@classsynchronized('mutex')
def send(self, message):
    # This might raise Full
    if not self._is_closed:
        self._queue.put(message)
        return True
    return False
@classsynchronized('mutex')
def close(self):
    # Don't do what we don't need to
    if self._is_closed:
        return
    # Make the _queue.get() request fail with an Empty exception
    # This will cause the channel to stop listenning to messages
    # First aquire the write lock, then notify the read lock and
    # finally release the write lock. This is equivalent to an
    # empty write, which will cause the Empty exception
    print("ACQUIRING not_full")
    self._queue.not_full.acquire()
    # Close first. If the queue is empty it will raise Empty as fast as
    # possible, instead of waiting for the timeout
    self._is_closed = True
    try:
        print("NOTIFYING not_empty")
        self._queue.not_empty.notify()
        print("NOTIFIED not_empty")
    finally:
        self._queue.not_full.release()
        print("RELEASED not_full")
def _yield_response(self):
    try:
        while True:
            # Fetch from the queue, wait until available, or a timeout
            timeout = self.get_timeout()
            print("[WAITING]")
            message = self._queue.get(True, timeout)
            print("[DONE WAITING] " + message)
            self._queue.task_done()
            # Don't yield messages on closed queues, flush instead
            # This prevents writting to a closed stream, but it's
            # up to the user to close the queue before closing the stream
            if not self._is_closed:
                yield message
            # The queue is closed, ignore all remaining messages
            # Allow subclasses to change the way ignored messages are handled
            else:
                self.handle_ignored(message)
    # This exception will be thrown when the channel is closed or
    # when it times out. Close the channel, in case a timeout caused
    # an exception
    except Empty:
        pass
    # Make sure the channel is closed, we can get here by timeout
    self.close()
    # Finally, empty the queue ignoring all remaining messages
    try:
        while True:
            message = self._queue.get_nowait()
            self.handle_ignored(message)
    except Empty:
        pass
我只包括了相关的方法,但请注意这是一个类。问题是,这并不像我预期的那样。队列确实关闭了,所有打印都显示在控制台中,但是等待消息的线程没有得到通知。相反,它总是以超时退出。
所有@classsynchronized('mutex') 注释都在类方面同步具有相同标识符('mutex')的方法,也就是说,具有相同ID 的注释的类中的每个方法都相互同步。
我在关闭之前获取 not_full 锁的原因是为了防止插入关闭的通道。只有这样我才会通知 not_empty 锁。
知道为什么这不起作用吗?还有其他建议吗?
提前致谢。
编辑:
我对印刷品做了一些改动。我创建频道并立即发送消息。然后我发送一个 HTTP 请求来删除它。这是输出:
[WAITING]
[DONE WAITING] message
[WAITING]
ACQUIRING not_full
NOTIFYING not_empty
NOTIFIED not_empty
RELEASE not_full
所以:
- 第一条消息被处理并成功发送(我在客户端得到它,所以......)
 - 然后队列正在等待。它应该在 not_empty 锁上等待,对吧?
 - 我对频道发出 DELETE 请求。它获取 not_full 锁(以防止写入),并通知 not_empty 锁。
 
我真的不明白...如果线程得到通知,为什么它不解锁?