2

我已经实现了一个消费者/生产者优先级队列,其中优先级实际上是一个时间戳,表示何时应该交付项目。它工作得很好,但我想知道是否有人有更好的想法来实现它或对当前实现发表评论。

代码在 Python 中。创建单个线程以按时唤醒等待的消费者。我知道这是在库中创建线程的反模式,但我无法设计另一种方法。

这是代码:

import collections
import heapq
import threading
import time

class TimelyQueue(threading.Thread):
    """
    Implements a similar but stripped down interface of Queue which
    delivers items on time only.
    """

    class Locker:
        def __init__(self, lock):
            self.l = lock
        def __enter__(self):
            self.l.acquire()
            return self.l
        def __exit__(self, type, value, traceback):
            self.l.release()

    # Optimization to avoid wasting CPU cycles when something
    # is about to happen in less than 5 ms.
    _RESOLUTION = 0.005

    def __init__(self):
        threading.Thread.__init__(self)
        self.daemon = True
        self.queue = []
        self.triggered = collections.deque()
        self.putcond = threading.Condition()
        self.getcond = threading.Condition()
        # Optimization to avoid waking the thread uselessly.
        self.putwaketime = 0

    def put(self, when, item):
        with self.Locker(self.putcond):
            heapq.heappush(self.queue, (when, item))
            if when < self.putwaketime or self.putwaketime == 0:
                self.putcond.notify()

    def get(self, timeout=None):
        with self.Locker(self.getcond):
            if len(self.triggered) > 0:
                when, item = self.triggered.popleft()
                return item
                self.getcond.wait(timeout)
            try:
                when, item = self.triggered.popleft()
            except IndexError:
                return None
            return item

    def qsize(self):
        with self.Locker(self.putcond):
            return len(self.queue)

    def run(self):
        with self.Locker(self.putcond):
            maxwait = None
            while True:
                curtime = time.time()
                try:
                    when, item = self.queue[0]
                    maxwait = when - curtime
                    self.putwaketime = when
                except IndexError:
                    maxwait = None
                    self.putwaketime = 0
                self.putcond.wait(maxwait)

                curtime = time.time()
                while True:
                    # Don't dequeue now, we are not sure to use it yet.
                    try:
                        when, item = self.queue[0]
                    except IndexError:
                        break
                    if when > curtime + self._RESOLUTION:
                        break

                    self.triggered.append(heapq.heappop(self.queue))
                if len(self.triggered) > 0:
                    with self.Locker(self.getcond):
                        self.getcond.notify()


if __name__ == "__main__":
    q = TimelyQueue()
    q.start()

    N = 50000
    t0 = time.time()
    for i in range(N):
        q.put(time.time() + 2, i)
    dt = time.time() - t0
    print "put done in %.3fs (%.2f put/sec)" % (dt, N / dt)
    t0 = time.time()
    i = 0
    while i < N:
        a = q.get(3)
        if i == 0:
            dt = time.time() - t0
            print "start get after %.3fs" % dt
            t0 = time.time()
        i += 1
    dt = time.time() - t0
    print "get done in %.3fs (%.2f get/sec)" % (dt, N / dt)
4

2 回答 2

0

这里唯一真正需要后台线程的就是一个计时器,它可以在服务员用完时踢出服务员,对吧?

首先,您可以使用threading.Timer而不是显式后台线程来实现它。但是,虽然这可能更简单,但它并不能真正解决您在用户背后创建线程的问题,无论他们是否愿意。此外,使用threading.Timer,您实际上每次重新启动计时器时都会关闭一个新线程,这可能是一个性能问题。(您一次只有一个,但启动和停止线程仍然不是免费的。)

如果您查看 PyPI 模块、ActiveState 配方和各种框架,有许多实现可以让您在单个后台线程上运行多个计时器。那会解决你的问题。

但这仍然不是一个完美的解决方案。例如,假设我的应用程序需要 20 个TimelyQueue对象——或者TimelyQueue再加上 19 个其他都需要计时器的东西。我仍然会得到 20 个线程。或者,假设我正在构建一个套接字服务器或一个 GUI 应用程序(您的两个最明显的用例TimelyQueue;我可以在我的事件循环之上实现一个计时器(或者,很可能只使用框架),那么为什么我需要一个线程呢?

解决这个问题的方法是提供一个钩子来提供任何计时器工厂:

def __init__(self, timerfactory = threading.Timer):
    self.timerfactory = timerfactory
    ...

现在,当您需要调整计时器时:

if when < self.waketime:
    self.timer.cancel()
    self.timer = self.timerfactory(when - now(), self.timercallback)
    self.waketime = when

对于快速和肮脏的用例,开箱即用就足够了。但是,如果我正在使用twisted,我可以只使用TimelyQueue(twisted.reactor.callLater),现在队列的计时器将通过twisted事件循环。或者,如果我有一个在其他地方使用的多定时器单线程实现TimelyQueue(multiTimer.add),那么现在队列的定时器与我的所有其他定时器在同一个线程上运行。

如果您愿意,您可以提供比 更好的默认值threading.Timer,但实际上,我认为大多数需要比 更好threading.Timer的东西的人能够为他们的特定应用程序提供比您提供的任何东西更好的东西。

当然,并非每个计时器实现都具有与threading.Timer-- 尽管您会惊讶于其中有多少实现相同的 API。但是编写适配器并不难,如果您有一个要使用的计时器,TimelyQueue但它的接口错误。例如,如果我正在构建一个 PyQt4/PySide 应用程序,QTimer没有cancel方法,并且需要毫秒而不是秒,所以我必须做这样的事情:

class AdaptedQTimer(object):
    def __init__(self, timeout, callback):
        self.timer = QTimer.singleShot(timeout * 1000, callback)
    def cancel(self):
        self.timer.stop()

q = TimelyQueue(AdaptedQTimer)

或者,如果我想QObject更直接地将队列集成到一个队列中,我可以结束QObject.startTimer()并让我的timerEvent(self)方法调用回调。

一旦你考虑适配器,最后一个想法。我认为这不值得,但可能值得考虑。如果您的计时器采用时间戳而不是 timedelta,并且有一个adjust方法而不是/而不是 a cancel,并且拥有自己的waketime,那么您的TimelyQueue实现可能会更简单,并且可能更有效。在put中,你有这样的东西:

if self.timer is None:
    self.timer = self.timerfactory(when)
elif when < self.timer.waketime:
    self.timer.adjust(when)

当然大多数定时器不提供这个接口。但是,如果有人拥有或愿意制作一个,他们可以获得好处。对于其他所有人,您可以提供一个简单的适配器,将threading.Timer-style 计时器转换为您需要的类型,例如:

def timerFactoryAdapter(threadingStyleTimerFactory):
    class TimerFactory(object):
        def __init__(self, timestamp, callback):
            self.timer = threadingStyleTimerFactory(timestamp - now(), callback)
            self.callback = callback
        def cancel(self):
            return self.timer.cancel()
        def adjust(self, timestamp):
            self.timer.cancel()
            self.timer = threadingStyleTimerFactory(timestamp - now(), self.callback)
于 2012-12-05T23:25:59.990 回答
0

作为记录,我已经使用计时器工厂实现了您提出的建议。我使用上面的版本和使用threading.Timer类的新版本运行了一个小基准测试:

  1. 首次实施

    • 使用默认分辨率(5 ms,即 5 ms 窗口内的所有内容一起触发),它可以达到大约 88k put()/sec 和 69k get()/sec。

    • 分辨率设置为 0 毫秒(无优化)时,它达到了大约 88k put()/sec 和 55k get()/sec。

  2. 第二次实施

    • 使用默认分辨率(5 ms),它可以达到大约 88k put()/sec 和 65k get()/sec。

    • 分辨率设置为 0 ms 时,它可以达到大约 88k put()/sec 和 62k get()/sec。

我承认我很惊讶第二个实现在没有分辨率优化的情况下更快。现在调查为时已晚。

import collections
import heapq
import threading
import time

class TimelyQueue:
    """
    Implements a similar but stripped down interface of Queue which
    delivers items on time only.
    """

    def __init__(self, resolution=5, timerfactory=threading.Timer):
        """
        `resolution' is an optimization to avoid wasting CPU cycles when
        something is about to happen in less than X ms.
        """
        self.resolution = float(resolution) / 1000
        self.timerfactory = timerfactory
        self.queue = []
        self.triggered = collections.deque()
        self.putcond = threading.Condition()
        self.getcond = threading.Condition()
        # Optimization to avoid waking the thread uselessly.
        self.putwaketime = 0
        self.timer = None
        self.terminating = False

    def __arm(self):
        """
        Arm the next timer; putcond must be acquired!
        """
        curtime = time.time()
        when, item = self.queue[0]
        interval = when - curtime
        self.putwaketime = when
        self.timer = self.timerfactory(interval, self.__fire)
        self.timer.start()

    def __fire(self):
        with self.putcond:
            curtime = time.time()
            debug = 0
            while True:
                # Don't dequeue now, we are not sure to use it yet.
                try:
                    when, item = self.queue[0]
                except IndexError:
                    break
                if when > curtime + self.resolution:
                    break

                debug += 1
                self.triggered.append(heapq.heappop(self.queue))
            if len(self.triggered) > 0:
                with self.getcond:
                    self.getcond.notify(len(self.triggered))
            if self.terminating:
                return
            if len(self.queue) > 0:
                self.__arm()

    def put(self, when, item):
        """
        `when' is a Unix time from Epoch.
        """
        with self.putcond:
            heapq.heappush(self.queue, (when, item))
            if when >= self.putwaketime and self.putwaketime != 0:
                return
            # Arm next timer.
            if self.timer is not None:
                self.timer.cancel()
            self.__arm()

    def get(self, timeout=None):
        """
        Timely return the next object on the queue.
        """
        with self.getcond:
            if len(self.triggered) > 0:
                when, item = self.triggered.popleft()
                return item
            self.getcond.wait(timeout)
            try:
                when, item = self.triggered.popleft()
            except IndexError:
                return None
            return item

    def qsize(self):
        """
        Self explanatory.
        """
        with self.putcond:
            return len(self.queue)

    def terminate(self):
        """
        Request the embedded thread to terminate.
        """
        with self.putcond:
            self.terminating = True
            if self.timer is not None:
                self.timer.cancel()
            self.putcond.notifyAll()


if __name__ == "__main__":
    q = TimelyQueue(0)
    N = 100000
    t0 = time.time()
    for i in range(N):
        q.put(time.time() + 2, i)
    dt = time.time() - t0
    print "put done in %.3fs (%.2f put/sec)" % (dt, N / dt)
    t0 = time.time()
    i = 0
    while i < N:
        a = q.get(3)
        if i == 0:
            dt = time.time() - t0
            print "start get after %.3fs" % dt
            t0 = time.time()
        i += 1
    dt = time.time() - t0
    print "get done in %.3fs (%.2f get/sec)" % (dt, N / dt)
    q.terminate()
    # Give change to the thread to exit properly, otherwise we may get
    # a stray interpreter exception.
    time.sleep(0.1)
于 2012-12-13T23:26:37.370 回答