2

我刚刚用 Python 写了一个任务队列,它的作用是限制一次运行的任务数量。这有点不同,Queue.Queue因为它不是限制队列中可以有多少项目,而是限制一次可以取出多少项目。它仍然使用 unboundedQueue.Queue来完成它的工作,但它依赖于 aSemaphore来限制线程数:

from Queue import Queue
from threading import BoundedSemaphore, Lock, Thread


class TaskQueue(object):
    """
    Queues tasks to be run in separate threads and limits the number
    concurrently running tasks.

    """

    def __init__(self, limit):
        """Initializes a new instance of a TaskQueue."""
        self.__semaphore = BoundedSemaphore(limit)
        self.__queue = Queue()
        self.__cancelled = False
        self.__lock = Lock()

    def enqueue(self, callback):
        """Indicates that the given callback should be ran."""
        self.__queue.put(callback)

    def start(self):
        """Tells the task queue to start running the queued tasks."""
        thread = Thread(target=self.__process_items)
        thread.start()

    def stop(self):
        self.__cancel()
        # prevent blocking on a semaphore.acquire
        self.__semaphore.release()
        # prevent blocking on a Queue.get
        self.__queue.put(lambda: None)

    def __cancel(self):
        print 'canceling'
        with self.__lock:
            self.__cancelled = True

    def __process_items(self):
        while True:
            # see if the queue has been stopped before blocking on acquire
            if self.__is_canceled():
                break

            self.__semaphore.acquire()

            # see if the queue has been stopped before blocking on get
            if self.__is_canceled():
                break

            callback = self.__queue.get()

            # see if the queue has been stopped before running the task
            if self.__is_canceled():
                break

            def runTask():
                try:
                    callback()
                finally:
                    self.__semaphore.release()

            thread = Thread(target=runTask)
            thread.start()
            self.__queue.task_done()

    def __is_canceled(self):
        with self.__lock:
            return self.__cancelled

除非我明确停止任务队列,否则 Python 解释器将永远运行。这比我想象的要棘手得多。如果您查看该stop方法,您会看到我在队列上设置了一个canceled标志、release信号量和put一个无操作回调。最后两部分是必要的,因为代码可能会阻塞在SemaphoreQueue. 我基本上必须强迫这些通过,以便循环有机会爆发。

此代码有效。当运行一个试图并行运行数千个任务的服务时,这个类很有用。为了保持机器平稳运行并防止操作系统因过多的活动线程而尖叫,此代码将限制任何时间的线程数。

我之前用 C# 写过类似的代码块。使该代码特别“干”的原因是 .NET 有一个叫做 a 的东西CancellationToken,几乎每个线程类都使用它。任何时候有阻塞操作,该操作都需要一个可选的令牌。如果父任务被取消,任何使用该令牌阻塞的子任务也将立即被取消。与通过释放信号量或将值放入队列来“伪造”它相比,这似乎是一种更干净的退出方式。

我想知道在 Python 中是否有等效的方法?我绝对想使用线程而不是异步事件之类的东西。我想知道是否有一种方法可以使用两个Queue.Queues 来实现相同的事情,其中​​一个具有最大尺寸,而另一个没有 - 但我仍然不确定如何处理取消。

4

3 回答 3

4

我认为您的代码可以通过使用中毒和来简化Thread.join()

from Queue import Queue
from threading import Thread

poison = object()

class TaskQueue(object):

    def __init__(self, limit):
        def process_items():
            while True:
                callback = self._queue.get()
                if callback is poison:
                    break
                try:
                    callback()
                except:
                    pass
                finally:
                    self._queue.task_done()
        self._workers = [Thread(target=process_items) for _ in range(limit)]
        self._queue = Queue()

    def enqueue(self, callback):
        self._queue.put(callback)

    def start(self):
        for worker in self._workers:
            worker.start()

    def stop(self):
        for worker in self._workers:
            self._queue.put(poison)
        while self._workers:
            self._workers.pop().join()

未经测试。

为简洁起见,我删除了评论。

此外,在这个版本process_items()中是真正的私人。

顺便说一句:该Queue模块的重点是让您摆脱可怕的锁定和事件。

于 2012-09-17T02:52:09.577 回答
1

您似乎正在为队列中的每个任务创建一个新线程。这本身就是一种浪费,同时也会给你带来如何限制线程数的问题。

相反,一种常见的方法是创建固定数量的工作线程,让它们自由地从队列中拉取任务。要取消队列,您可以清除它并让工作人员保持活力以期待未来的工作。

于 2012-09-16T19:02:16.820 回答
0

我接受了 Janne Karila 的建议并创建了一个线程池。这消除了对信号量的需要。问题是如果您希望队列消失,您必须停止工作线程运行(只是我之前所做的一个变体)。新代码非常相似:

class TaskQueue(object):
    """
    Queues tasks to be run in separate threads and limits the number
    concurrently running tasks.

    """

    def __init__(self, limit):
        """Initializes a new instance of a TaskQueue."""
        self.__workers = []
        for _ in range(limit):
            worker = Thread(target=self.__process_items)
            self.__workers.append(worker)
        self.__queue = Queue()
        self.__cancelled = False
        self.__lock = Lock()
        self.__event = Event()

    def enqueue(self, callback):
        """Indicates that the given callback should be ran."""
        self.__queue.put(callback)

    def start(self):
        """Tells the task queue to start running the queued tasks."""
        for worker in self.__workers:
            worker.start()

    def stop(self):
        """
        Stops the queue from processing anymore tasks. Any actively running
        tasks will run to completion.

        """
        self.__cancel()
        # prevent blocking on a Queue.get
        for _ in range(len(self.__workers)):
            self.__queue.put(lambda: None)
            self.__event.wait()

    def __cancel(self):
        with self.__lock:
            self.__queue.queue.clear()
            self.__cancelled = True

    def __process_items(self):
        while True:
            callback = self.__queue.get()

            # see if the queue has been stopped before running the task
            if self.__is_canceled():
                break

            try:
                callback()
            except:
                pass
            finally:
                self.__queue.task_done()
        self.__event.set()

    def __is_canceled(self):
        with self.__lock:
            return self.__cancelled

如果你仔细看,我不得不做一些会计来杀死工人。我基本上等了Event和工人一样多的时间。我clear是底层队列,以防止工人以任何其他方式被取消。在将每个虚假值注入队列后,我也会等待,因此一次只能有一个工作人员取消。

我已经对此进行了一些测试,它似乎正在工作。消除对虚假值的需求仍然很好。

于 2012-09-17T02:12:46.290 回答