12

我怎样才能queue.Queue同时在多个上“选择”?

Golang 的频道具有所需的功能

select {
case i1 = <-c1:
    print("received ", i1, " from c1\n")
case c2 <- i2:
    print("sent ", i2, " to c2\n")
case i3, ok := (<-c3):  // same as: i3, ok := <-c3
    if ok {
        print("received ", i3, " from c3\n")
    } else {
        print("c3 is closed\n")
    }
default:
    print("no communication\n")
}

其中第一个解除阻塞的通道执行相应的阻塞。我将如何在 Python 中实现这一点?

更新0

根据tux21b's answer中给出的链接,所需的队列类型具有以下属性:

  • 多生产者/多消费者队列 (MPMC)
  • 提供每个生产者的 FIFO/LIFO
  • 当队列为空/满时,消费者/生产者被阻塞

此外,渠道可能是阻塞的,生产者将阻塞,直到消费者检索到该项目。我不确定 Python 的 Queue 可以做到这一点。

4

4 回答 4

3

如果您使用queue.PriorityQueue,您可以使用通道对象作为优先级获得类似的行为:

import threading, logging
import random, string, time
from queue import PriorityQueue, Empty
from contextlib import contextmanager

logging.basicConfig(level=logging.NOTSET,
                    format="%(threadName)s - %(message)s")

class ChannelManager(object):
    next_priority = 0

    def __init__(self):
        self.queue = PriorityQueue()
        self.channels = []

    def put(self, channel, item, *args, **kwargs):
        self.queue.put((channel, item), *args, **kwargs)

    def get(self, *args, **kwargs):
        return self.queue.get(*args, **kwargs)

    @contextmanager
    def select(self, ordering=None, default=False):
        if default:
            try:
                channel, item = self.get(block=False)
            except Empty:
                channel = 'default'
                item = None
        else:
            channel, item = self.get()
        yield channel, item


    def new_channel(self, name):
        channel = Channel(name, self.next_priority, self)
        self.channels.append(channel)
        self.next_priority += 1
        return channel


class Channel(object):
    def __init__(self, name, priority, manager):
        self.name = name
        self.priority = priority
        self.manager = manager

    def __str__(self):
        return self.name

    def __lt__(self, other):
        return self.priority < other.priority

    def put(self, item):
        self.manager.put(self, item)


if __name__ == '__main__':
    num_channels = 3
    num_producers = 4
    num_items_per_producer = 2
    num_consumers = 3
    num_items_per_consumer = 3

    manager = ChannelManager()
    channels = [manager.new_channel('Channel#{0}'.format(i))
                for i in range(num_channels)]

    def producer_target():
        for i in range(num_items_per_producer):
            time.sleep(random.random())
            channel = random.choice(channels)
            message = random.choice(string.ascii_letters)
            logging.info('Putting {0} in {1}'.format(message, channel))
            channel.put(message)

    producers = [threading.Thread(target=producer_target,
                                  name='Producer#{0}'.format(i))
                 for i in range(num_producers)]
    for producer in producers:
        producer.start()
    for producer in producers:
        producer.join()
    logging.info('Producers finished')

    def consumer_target():
        for i in range(num_items_per_consumer):
            time.sleep(random.random())
            with manager.select(default=True) as (channel, item):
                if channel:
                    logging.info('Received {0} from {1}'.format(item, channel))
                else:
                    logging.info('No data received')

    consumers = [threading.Thread(target=consumer_target,
                                  name='Consumer#{0}'.format(i))
                 for i in range(num_consumers)]
    for consumer in consumers:
        consumer.start()
    for consumer in consumers:
        consumer.join()
    logging.info('Consumers finished')

示例输出:

Producer#0 - Putting x in Channel#2
Producer#2 - Putting l in Channel#0
Producer#2 - Putting A in Channel#2
Producer#3 - Putting c in Channel#0
Producer#3 - Putting z in Channel#1
Producer#1 - Putting I in Channel#1
Producer#1 - Putting L in Channel#1
Producer#0 - Putting g in Channel#1
MainThread - Producers finished
Consumer#1 - Received c from Channel#0
Consumer#2 - Received l from Channel#0
Consumer#0 - Received I from Channel#1
Consumer#0 - Received L from Channel#1
Consumer#2 - Received g from Channel#1
Consumer#1 - Received z from Channel#1
Consumer#0 - Received A from Channel#2
Consumer#1 - Received x from Channel#2
Consumer#2 - Received None from default
MainThread - Consumers finished

在这个例子中,ChannelManager它只是一个将方法queue.PriorityQueue实现为 a 的包装器,以使其看起来类似于Go 中的语句。selectcontextmanagerselect

需要注意的几点:

  • 订购

    • 在 Go 示例中,在select语句中写入通道的顺序决定了如果有多个通道可用的数据,将执行哪个通道的代码。

    • 在 python 示例中,顺序由分配给每个通道的优先级决定。但是,可以动态地将优先级分配给每个通道(如示例中所示),因此可以使用更复杂的select方法更改顺序,该方法负责根据方法的参数分配新的优先级。此外,一旦上下文管理器完成,旧的排序可以重新建立。

  • 阻塞

    • 在 Go 示例中,select如果存在案例,则该语句是阻塞的default

    • 在 python 示例中,必须将布尔参数传递给该select方法,以明确何时需要阻塞/非阻塞。在非阻塞情况下,上下文管理器返回的通道只是字符串,因此在内部代码中很容易在语句'default'内部的代码中检测到这一点。with

  • 线程:queue模块中的对象已经为多生产者、多消费者场景做好了准备,如示例中所示。

于 2011-12-10T13:10:20.017 回答
2

生产者-消费者队列有很多不同的实现,比如queue.Queue可用。它们通常在许多属性上有所不同,例如Dmitry Vyukov在这篇优秀文章中列出的属性。如您所见,可能有超过 10k 种不同的组合。用于此类队列的算法也因需求而异。仅仅扩展现有队列算法以保证附加属性是不可能的,因为这通常需要不同的内部数据结构和不同的算法。

Go 的频道提供了相对较多的保证属性,因此这些频道可能适用于很多节目。最难的要求之一是支持一次读取/阻塞多个通道(select 语句),并且如果 select 语句中的多个分支能够继续,则公平地选择一个通道,这样就不会留下任何消息. Python 的queue.Queue不提供此功能,因此根本不可能用它归档相同的行为。

因此,如果您想继续使用queue.Queue,您需要找到解决该问题的方法。然而,这些变通方法有它们自己的缺点列表,并且更难维护。寻找另一个提供您需要的功能的生产者-消费者队列可能是一个更好的主意!无论如何,这里有两种可能的解决方法:

轮询

while True:
  try:
    i1 = c1.get_nowait()
    print "received %s from c1" % i1
  except queue.Empty:
    pass
  try:
    i2 = c2.get_nowait()
    print "received %s from c2" % i2
  except queue.Empty:
    pass
  time.sleep(0.1)

这可能会在轮询通道时使用大量 CPU 周期,并且在有大量消息时可能会很慢。使用具有指数回退时间(而不是此处显示的恒定 0.1 秒)的 time.sleep() 可能会大大改善此版本。

单个通知队列

queue_id = notify.get()
if queue_id == 1:
  i1 = c1.get()
  print "received %s from c1" % i1
elif queue_id == 2:
  i2 = c2.get()
  print "received %s from c2" % i2

使用此设置,您必须在发送到 c1 或 c2 之后将某些内容发送到通知队列。这可能对您有用,只要只有一个这样的通知队列对您来说就足够了(即您没有多个“选择”,每个“选择”都阻塞在您的频道的不同子集上)。

或者,您也可以考虑使用 Go。无论如何,Go 的 goroutines 和并发支持比 Python 有限的线程功能强大得多。

于 2011-12-16T20:46:14.140 回答
2

pychan项目在 Python复制了 Go 通道,包括多路复用。它实现了与 Go 相同的算法,因此它满足您所有想要的属性:

  • 多个生产者和消费者可以通过一个 Chan 进行通信。当生产者和消费者都准备好时,他们会阻塞
  • 生产者和消费者按照到达的顺序获得服务 (FIFO)
  • 空(满)队列将阻塞消费者(生产者)。

您的示例如下所示:

c1 = Chan(); c2 = Chan(); c3 = Chan()

try:
    chan, value = chanselect([c1, c3], [(c2, i2)])
    if chan == c1:
        print("Received %r from c1" % value)
    elif chan == c2:
        print("Sent %r to c2" % i2)
    else:  # c3
        print("Received %r from c3" % value)
except ChanClosed as ex:
    if ex.which == c3:
        print("c3 is closed")
    else:
        raise

(完全披露:我写了这个库)

于 2013-06-08T20:50:49.820 回答
1
from queue import Queue

# these imports needed for example code
from threading import Thread
from time import sleep
from random import randint

class MultiQueue(Queue):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.queues = []

    def addQueue(self, queue):
        queue.put = self._put_notify(queue, queue.put)
        queue.put_nowait = self._put_notify(queue, queue.put_nowait)
        self.queues.append(queue)

    def _put_notify(self, queue, old_put):
        def wrapper(*args, **kwargs):
            result = old_put(*args, **kwargs)
            self.put(queue)
            return result
        return wrapper

if __name__ == '__main__':
    # an example of MultiQueue usage

    q1 = Queue()
    q1.name = 'q1'
    q2 = Queue()
    q2.name = 'q2'
    q3 = Queue()
    q3.name = 'q3'

    mq = MultiQueue()
    mq.addQueue(q1)
    mq.addQueue(q2)
    mq.addQueue(q3)

    queues = [q1, q2, q3]
    for i in range(9):
        def message(i=i):
            print("thread-%d starting..." % i)
            sleep(randint(1, 9))
            q = queues[i%3]
            q.put('thread-%d ending...' % i)
        Thread(target=message).start()

    print('awaiting results...')
    for _ in range(9):
        result = mq.get()
        print(result.name)
        print(result.get())

与其尝试使用.get()多个队列的方法,这里的想法是让队列在MultiQueue数据准备好时通知它们——有点select相反。这是通过MultiQueue包装各种Queue'sput()put_nowait()方法来实现的,以便当将某些内容添加到这些队列中时,该队列然后put()进入MultiQueue,并且相应的MultiQueue.get()将检索Queue已准备好数据的 。

MultiQueue基于 FIFO 队列,但您也可以根据需要使用 LIFO 或优先级队列作为基础。

于 2011-12-16T22:05:46.147 回答