27

我希望将 a 转储multiprocessing.Queue到列表中。对于该任务,我编写了以下函数:

import Queue

def dump_queue(queue):
    """
    Empties all pending items in a queue and returns them in a list.
    """
    result = []

    # START DEBUG CODE
    initial_size = queue.qsize()
    print("Queue has %s items initially." % initial_size)
    # END DEBUG CODE

    while True:
        try:
            thing = queue.get(block=False)
            result.append(thing)
        except Queue.Empty:

            # START DEBUG CODE
            current_size = queue.qsize()
            total_size = current_size + len(result)
            print("Dumping complete:")
            if current_size == initial_size:
                print("No items were added to the queue.")
            else:
                print("%s items were added to the queue." % \
                      (total_size - initial_size))
            print("Extracted %s items from the queue, queue has %s items \
            left" % (len(result), current_size))
            # END DEBUG CODE

            return result

但由于某种原因,它不起作用。

观察以下 shell 会话:

>>> import multiprocessing
>>> q = multiprocessing.Queue()
>>> for i in range(100):
...     q.put([range(200) for j in range(100)])
... 
>>> q.qsize()
100
>>> l=dump_queue(q)
Queue has 100 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 1 items from the queue, queue has 99 items left
>>> l=dump_queue(q)
Queue has 99 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 3 items from the queue, queue has 96 items left
>>> l=dump_queue(q)
Queue has 96 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 1 items from the queue, queue has 95 items left
>>> 

这里发生了什么事?为什么不是所有的物品都被倾倒?

4

2 回答 2

27

尝试这个:

import Queue
import time

def dump_queue(queue):
    """
    Empties all pending items in a queue and returns them in a list.
    """
    result = []

    for i in iter(queue.get, 'STOP'):
        result.append(i)
    time.sleep(.1)
    return result

import multiprocessing
q = multiprocessing.Queue()
for i in range(100):
    q.put([range(200) for j in range(100)])
q.put('STOP')
l=dump_queue(q)
print len(l)

多处理队列有一个内部缓冲区,该缓冲区具有一个馈线线程,该线程将工作从缓冲区中拉出并将其刷新到管道中。如果不是所有的对象都被刷新,我可以看到 Empty 过早引发的情况。使用哨兵来指示队列的结束是安全的(可靠的)。此外,使用 iter(get, sentinel) 习惯用法比依赖 Empty 更好。

我不喜欢它可能由于刷新时间而升空(我添加了 time.sleep(.1) 以允许上下文切换到馈线线程,您可能不需要它,没有它它可以工作 - 这是一种习惯释放 GIL)。

于 2009-10-08T23:36:18.317 回答
0
# in theory:
def dump_queue(q):
    q.put(None)
    return list(iter(q.get, None))

# in practice this might be more resilient:
def dump_queue(q):
    q.put(None)
    return list(iter(lambda : q.get(timeout=0.00001), None))

# but neither case handles all the ways things can break
# for that you need 'managers' and 'futures' ... see Commentary

我更喜欢None哨兵,但我倾向于同意 jnoller 的观点,即 mp.queue 可以使用安全且简单的哨兵。他对提早空仓风险的评论也是有效的,见下文。

评论:

这是旧的,Python 已经改变,但是,如果您在 MP Python 中遇到列表 <-> 队列问题,这确实会受到影响。所以,让我们再深入一点:

首先,这不是错误,而是一个功能:https ://bugs.python.org/issue20147 。为了节省您阅读文档中的讨论和更多详细信息的时间,这里有一些亮点(有点哲学,但我认为它可能会帮助一些在 Python 中开始使用 MP/MT 的人):

  • MP 队列是能够从同一系统上的不同线程、不同进程进行通信的结构,实际上可以是不同的(联网的)计算机
  • 一般来说,对于并行/分布式系统,严格同步的成本很高,因此每次您将 API 的一部分用于任何 MP/MT 数据结构时,您都需要查看文档以了解它承诺或不承诺做什么。提示:如果一个函数不包含“锁”或“信号量”或“屏障”等词,那么它将是“异步”和“尽力而为”(近似)的某种混合,或者你可能称之为“片状” 。”
  • 针对这种情况:Python 是一种解释型语言,具有著名的单解释器线程和著名的“全局解释器锁”(GIL)。如果你的整个程序是单进程、单线程的,那么一切都是笨拙的。如果不是(而对于 MP,则非常不),您需要给口译员一些喘息的空间。time.sleep()是你的朋友。在这种情况下,超时。

在您的解决方案中,您使用易碎函数 - get() 和 qsize()。而且代码实际上比您想象的要糟糕 - 增加队列的大小和对象的大小,您可能会破坏:

在此处输入图像描述

现在,您可以使用不稳定的例程,但您需要给它们留有回旋余地。在您的示例中,您只是在敲击该队列。您需要做的就是将线路thing = queue.get(block=False)改为 be thing = queue.get(block=True,timeout=0.00001),您应该没问题。

时间 0.00001 是经过仔细选择的(10^-5),它大约是您可以安全地做到的最小时间(这是艺术与科学相遇的地方)。

关于为什么需要超时的一些评论:这与 MP 队列如何工作的内部结构有关。当您将某些内容“放入” MP 队列时,它实际上并没有放入队列中,而是排队等待最终出现。这就是为什么qsize()碰巧给了你一个正确的结果——那部分代码知道队列中有一堆东西。您只需要意识到“在”队列中的对象与“我现在可以读取它”不同。将 MP 队列视为通过 USPS 或 FedEx 发送一封信——您可能有一张收据和一个跟踪号,显示“它在邮件中”,但收件人还不能打开它。现在,更具体地说,在您的情况下,您可以立即访问“0”项。那是因为您正在运行的单个解释器线程没有任何机会处理“排队”的东西,所以您的第一个循环只是为队列排队一堆东西,但您立即强制您的单线程尝试在它之前做一个 get()

有人可能会争辩说,它会减慢代码的速度以实现这些超时。不是真的 - MP 队列是重量级的结构,你应该只使用它们来传递相当重量级的“东西”,或者是大块数据,或者至少是复杂的计算。添加 10^-5 秒的行为实际上是让解释器有机会进行线程调度 - 此时它将看到您的备份put()操作。

警告

上述内容并不完全正确,这(可以说)是 get() 函数设计的问题。将 timeout 设置为非零的语义是 get() 函数在返回 Empty 之前不会阻塞更长的时间。但它实际上可能不是空的(还)。因此,如果您知道您的队列有很多东西要获取,那么上面的第二个解决方案效果更好,甚至超时时间更长。就我个人而言,我认为他们应该保持 timeout=0 的行为,但有一些 1e-5 的实际内置容差,因为很多人会对 MP 构造的 get 和 put 可能发生的事情感到困惑。

在您的示例代码中,您实际上并没有启动并行进程。如果我们这样做,那么您将开始得到一些随机结果 - 有时只会删除一些队列对象,有时会挂起,有时会崩溃,有时会发生不止一件事。在下面的示例中,一个进程崩溃而另一个挂起:

在此处输入图像描述

潜在的问题是,当您插入哨兵时,您需要知道队列已完成。应该完成队列的部分逻辑 - 例如,如果您有一个经典的主从设计,那么当添加最后一个任务时,主节点需要推送一个哨兵(结束)。否则,您最终会遇到竞争条件。

“正确”(有弹性)的方法是让经理期货参与:

import multiprocessing
import concurrent.futures

def fill_queue(q):
    for i in range(5000):
        q.put([range(200) for j in range(100)])

def dump_queue(q):
    q.put(None)
    return list(iter(q.get, None))

with multiprocessing.Manager() as manager:
    q = manager.Queue()
    with concurrent.futures.ProcessPoolExecutor() as executor:
        executor.submit(fill_queue, q)  # add stuff
        executor.submit(fill_queue, q)  # add more stuff
        executor.submit(fill_queue, q)  # ... and more
        
    # 'step out' of the executor
    l = dump_queue(q)

# 'step out' of the manager
print(f"Saw {len(l)} items")

让管理器处理您的 MP 构造(队列、字典等),并在其中让期货处理您的流程(并且在其中,如果您愿意,让另一个未来处理线程)。这样可以确保在您“解开”工作时将事情清理干净。

于 2021-09-07T23:24:12.467 回答