1

我编写了一个基本实用程序,它在一个线程中侦听消息,将它们添加到 FIFO 队列并在另一个线程中处理它们。每条消息都需要固定的时间来处理(它正在等待闪烁的灯停止闪烁),但消息可以随机到达(patterns在代码中是一个正则表达式字典以匹配传入的消息,如果找到匹配项,则将其添加到队列以及要闪烁的颜色模式)。

blink_queue = Queue()
def receive(data) :
    message = data['text']

    for pattern in patterns:
        if re.match(pattern, message):
            blink_queue.put(patterns[pattern])
            break
    return True

def blinker(q) :
    while True:
        args = q.get().split()
        subprocess.Popen(
            [blink_app] + args,
            startupinfo=startupinfo,
            stderr=subprocess.PIPE,
            stdout=subprocess.PIPE)
        time.sleep(blink_wait)
        q.task_done()

def subscribe():
    print("Listening for messages on '%s' channel..." % channel)
    pubnub.subscribe({
        'channel'  : channel,
        'callback' : receive
    })

blink_worker = Thread(target=blinker, args=(blink_queue,))
blink_worker.daemon=True
blink_worker.start()

sub_thread = Thread(target=subscribe)
sub_thread.daemon=True
sub_thread.start()

sub_thread.join()

如何在 Python 中实现一个 FIFO 队列,如果它变大,它会自动修剪最旧的(第一个)队列。我是创建另一个观看线程,还是在subscribe线程上检查大小?我是 Python 的新手,所以如果有一个完全合乎逻辑的数据类型,请随时称我为菜鸟,并把我送到正确的方向。

4

2 回答 2

7

原来有一个逻辑类型collections.deque。从文档中:

如果未指定 maxlen 或为 None,则双端队列可能会增长到任意长度。否则,双端队列限制为指定的最大长度。一旦有界长度的双端队列已满,当添加新项目时,相应数量的项目将从另一端丢弃。

是实现此数据类型的提交)

于 2012-12-14T22:20:15.437 回答
0

为此,如果队列太大,我将继承Queue并重载该put方法以按照您想要的方式删除项目。Queue

例如

class NukeOldDataQueue(Queue.Queue):
    def put(self,*args,**kwargs):
        if self.full():
            try:
                oldest_data = self.get()
                print('[WARNING]: throwing away old data:'+repr(oldest_data))
            # a True value from `full()` does not guarantee
            # that anything remains in the queue when `get()` is called
            except Queue.Empty:
                pass
        Queue.Queue.put(self,*args,**kwargs)

您可能还需要传递block=False参数或操作timeout参数,具体取决于意外丢弃新数据的严重程度或阻塞put()调用是否可以接受。

于 2012-12-14T20:50:27.880 回答