我已经实现了一个消费者/生产者优先级队列,其中优先级实际上是一个时间戳,表示何时应该交付项目。它工作得很好,但我想知道是否有人有更好的想法来实现它或对当前实现发表评论。
代码在 Python 中。创建单个线程以按时唤醒等待的消费者。我知道这是在库中创建线程的反模式,但我无法设计另一种方法。
这是代码:
import collections
import heapq
import threading
import time
class TimelyQueue(threading.Thread):
"""
Implements a similar but stripped down interface of Queue which
delivers items on time only.
"""
class Locker:
def __init__(self, lock):
self.l = lock
def __enter__(self):
self.l.acquire()
return self.l
def __exit__(self, type, value, traceback):
self.l.release()
# Optimization to avoid wasting CPU cycles when something
# is about to happen in less than 5 ms.
_RESOLUTION = 0.005
def __init__(self):
threading.Thread.__init__(self)
self.daemon = True
self.queue = []
self.triggered = collections.deque()
self.putcond = threading.Condition()
self.getcond = threading.Condition()
# Optimization to avoid waking the thread uselessly.
self.putwaketime = 0
def put(self, when, item):
with self.Locker(self.putcond):
heapq.heappush(self.queue, (when, item))
if when < self.putwaketime or self.putwaketime == 0:
self.putcond.notify()
def get(self, timeout=None):
with self.Locker(self.getcond):
if len(self.triggered) > 0:
when, item = self.triggered.popleft()
return item
self.getcond.wait(timeout)
try:
when, item = self.triggered.popleft()
except IndexError:
return None
return item
def qsize(self):
with self.Locker(self.putcond):
return len(self.queue)
def run(self):
with self.Locker(self.putcond):
maxwait = None
while True:
curtime = time.time()
try:
when, item = self.queue[0]
maxwait = when - curtime
self.putwaketime = when
except IndexError:
maxwait = None
self.putwaketime = 0
self.putcond.wait(maxwait)
curtime = time.time()
while True:
# Don't dequeue now, we are not sure to use it yet.
try:
when, item = self.queue[0]
except IndexError:
break
if when > curtime + self._RESOLUTION:
break
self.triggered.append(heapq.heappop(self.queue))
if len(self.triggered) > 0:
with self.Locker(self.getcond):
self.getcond.notify()
if __name__ == "__main__":
q = TimelyQueue()
q.start()
N = 50000
t0 = time.time()
for i in range(N):
q.put(time.time() + 2, i)
dt = time.time() - t0
print "put done in %.3fs (%.2f put/sec)" % (dt, N / dt)
t0 = time.time()
i = 0
while i < N:
a = q.get(3)
if i == 0:
dt = time.time() - t0
print "start get after %.3fs" % dt
t0 = time.time()
i += 1
dt = time.time() - t0
print "get done in %.3fs (%.2f get/sec)" % (dt, N / dt)