9

有谁知道一种获得接近 LIFO 甚至不接近 FIFO(例如随机)行为的干净方法multiprocessing.Queue

替代问题:有人可以指出管理背后实际存储结构的线程代码multiprocessing.Queue吗?提供大约 LIFO 访问似乎是微不足道的,但我在试图找到它的兔子洞中迷路了。

笔记:

  1. 相信multiprocessing.Queue 不保证顺序。美好的。但它接近先进先出,所以接近 LIFO 会很棒。
  2. 在使用它们之前,我可以将所有当前项目从队列中拉出并颠倒顺序,但如果可能的话,我更愿意避免混乱。

(编辑)澄清:我正在做一个 CPU 绑定模拟,multiprocessing因此不能使用来自Queue. 由于我几天没有看到任何答案,所以我在上面添加了替代问题。


multiprocessing.Queue如果这是一个问题,下面是接近 FIFO的轻微证据。它只是表明在一个简单的情况下(单线程),它在我的系统上是完美的 FIFO:

import multiprocessing as mp
import Queue

q = mp.Queue()

for i in xrange(1000):
    q.put(i)

deltas = []
while True:
    try:
        value1 = q.get(timeout=0.1)
        value2 = q.get(timeout=0.1)
        deltas.append(value2-value1)
    except Queue.Empty:
        break

#positive deltas would indicate the numbers are coming out in increasing order
min_delta, max_delta = min(deltas), max(deltas)
avg_delta = sum(deltas)/len(deltas)

print "min", min_delta
print "max", max_delta
print "avg", avg_delta

打印:最小值、最大值和平均值正好为 1(完美 FIFO)

4

2 回答 2

3

我查看了存在Lib/multiprocessing/queues.py于我的 Python 安装中的 Queue 类(Python 2.7,但在我简要检查的 Python 3.2 版本中没有什么明显的不同)。这是我理解它的工作原理:

Queue 对象维护了两组对象。一组是所有进程共享的多进程安全原语。其他的由每个进程单独创建和使用。

跨进程对象在__init__方法中设置:

  1. 一个Pipe对象,其两端保存为self._readerself._writer
  2. 一个BoundedSemaphore对象,它计算(并且可以选择限制)队列中有多少对象。
  3. 用于读取管道的Lock对象,在非 Windows 平台上用于写入的对象。(我认为这是因为在 Windows 上写入管道本质上是多进程安全的。)

每个进程的对象在_after_fork_start_thread方法中设置:

  1. collections.deque用于缓冲写入管道的对象。
  2. threading.condition用于在缓冲区不为空时发出信号的对象。
  3. 执行实际写入的threading.Thread对象。它是惰性创建的,因此在给定进程中至少请求一次写入队列之前,它不会存在。
  4. Finalize进程结束时清理东西的各种对象。

队列中的Aget非常简单。你获取读锁,减少信号量,并从管道的读端抓取一个对象。

Aput比较复杂。它使用多个线程。调用者获取put条件的锁,然后将其对象添加到缓冲区并在解锁之前发出条件信号。如果它还没有运行,它还会增加信号量并启动编写器线程。

编写器线程在方法中永远循环(直到取消)_feed。如果缓冲区为空,则等待notempty条件。然后它从缓冲区中取出一个项目,获取写锁(如果存在)并将该项目写入管道。


那么,考虑到所有这些,您可以修改它以获得 LIFO 队列吗?这似乎并不容易。管道本质上是 FIFO 对象,虽然 Queue 不能保证整体的 FIFO 行为(由于来自多个进程的写入的异步性质),但它总是主要是 FIFO。

如果您只有一个消费者,您可以get从队列中获取对象并将它们添加到您自己的进程本地堆栈中。做一个多消费者堆栈会更难,尽管使用共享内存,一个有限大小的堆栈不会太难。您需要一个锁、一对条件(用于在满状态和空状态下阻塞/发出信号)、一个共享整数值(用于持有的值的数量)和一个适当类型的共享数组(用于值本身)。

于 2012-08-21T19:48:17.963 回答
1

Queue 包中有一个LIFO 队列(Python 3 中的队列)。这未在 multiprocessing 或 multiprocessing.queues 模块中公开。

将您的行替换q = mp.Queue()q = Queue.LifoQueue()并运行打印:最小值、最大值和平均值正好为 -1。

(另外我认为从一个线程中获取项目时,您应该始终获得准确的 FIFO/LIFO 顺序。)

于 2012-08-21T14:10:45.260 回答