我刚刚用 Python 写了一个任务队列,它的作用是限制一次运行的任务数量。这有点不同,Queue.Queue
因为它不是限制队列中可以有多少项目,而是限制一次可以取出多少项目。它仍然使用 unboundedQueue.Queue
来完成它的工作,但它依赖于 aSemaphore
来限制线程数:
from Queue import Queue
from threading import BoundedSemaphore, Lock, Thread
class TaskQueue(object):
"""
Queues tasks to be run in separate threads and limits the number
concurrently running tasks.
"""
def __init__(self, limit):
"""Initializes a new instance of a TaskQueue."""
self.__semaphore = BoundedSemaphore(limit)
self.__queue = Queue()
self.__cancelled = False
self.__lock = Lock()
def enqueue(self, callback):
"""Indicates that the given callback should be ran."""
self.__queue.put(callback)
def start(self):
"""Tells the task queue to start running the queued tasks."""
thread = Thread(target=self.__process_items)
thread.start()
def stop(self):
self.__cancel()
# prevent blocking on a semaphore.acquire
self.__semaphore.release()
# prevent blocking on a Queue.get
self.__queue.put(lambda: None)
def __cancel(self):
print 'canceling'
with self.__lock:
self.__cancelled = True
def __process_items(self):
while True:
# see if the queue has been stopped before blocking on acquire
if self.__is_canceled():
break
self.__semaphore.acquire()
# see if the queue has been stopped before blocking on get
if self.__is_canceled():
break
callback = self.__queue.get()
# see if the queue has been stopped before running the task
if self.__is_canceled():
break
def runTask():
try:
callback()
finally:
self.__semaphore.release()
thread = Thread(target=runTask)
thread.start()
self.__queue.task_done()
def __is_canceled(self):
with self.__lock:
return self.__cancelled
除非我明确停止任务队列,否则 Python 解释器将永远运行。这比我想象的要棘手得多。如果您查看该stop
方法,您会看到我在队列上设置了一个canceled
标志、release
信号量和put
一个无操作回调。最后两部分是必要的,因为代码可能会阻塞在Semaphore
或Queue
. 我基本上必须强迫这些通过,以便循环有机会爆发。
此代码有效。当运行一个试图并行运行数千个任务的服务时,这个类很有用。为了保持机器平稳运行并防止操作系统因过多的活动线程而尖叫,此代码将限制任何时间的线程数。
我之前用 C# 写过类似的代码块。使该代码特别“干”的原因是 .NET 有一个叫做 a 的东西CancellationToken
,几乎每个线程类都使用它。任何时候有阻塞操作,该操作都需要一个可选的令牌。如果父任务被取消,任何使用该令牌阻塞的子任务也将立即被取消。与通过释放信号量或将值放入队列来“伪造”它相比,这似乎是一种更干净的退出方式。
我想知道在 Python 中是否有等效的方法?我绝对想使用线程而不是异步事件之类的东西。我想知道是否有一种方法可以使用两个Queue.Queue
s 来实现相同的事情,其中一个具有最大尺寸,而另一个没有 - 但我仍然不确定如何处理取消。