20

我有一个过程,需要“稍后”(通常在 10-60 秒后)执行一堆动作。问题是那些“稍后”的操作可能很多(1000 秒),因此使用Thread每个任务是不可行的。我知道geventeventlet等工具的存在,但问题之一是该过程使用zeromq进行通信,因此我需要一些集成(eventlet 已经拥有它)。

我想知道的是我的选择是什么?因此,欢迎提出建议,包括库(如果您使用过任何提到的,请分享您的经验)、技术(Python 的“协程”支持、使用一个休眠一段时间并检查队列的线程)、如何利用 zeromq 的 poll 或 eventloop 来完成这项工作,或者其他什么。

4

10 回答 10

19

考虑使用具有一个或多个工作线程的优先级队列来为任务提供服务。主线程可以将工作添加到队列中,并带有应该服务的最快时间戳。工作线程从队列中弹出工作,休眠直到达到优先级值的时间,完成工作,然后从队列中弹出另一个项目。

更充实的答案怎么样。mklauber 提出了一个很好的观点。如果当您有新的、更紧急的工作时,您的所有工作人员可能都在睡觉,那么 aqueue.PriorityQueue并不是真正的解决方案,尽管“优先队列”仍然是可以使用的技术,该技术可从heapq模块中获得。相反,我们将使用不同的同步原语;一个条件变量,在 python 中拼写为threading.Condition.

该方法相当简单,查看堆,如果工作是当前的,则将其弹出并执行该工作。如果有工作,但它被安排在未来,就等到那个时候,或者如果根本没有工作,就永远睡觉。

制作人完成了公平的工作;每次添加新工作时,它都会通知条件,因此如果有正在睡觉的工作人员,他们会醒来并重新检查队列以获取新工作。

import heapq, time, threading

START_TIME = time.time()
SERIALIZE_STDOUT = threading.Lock()
def consumer(message):
    """the actual work function.  nevermind the locks here, this just keeps
       the output nicely formatted.  a real work function probably won't need
       it, or might need quite different synchronization"""
    SERIALIZE_STDOUT.acquire()
    print time.time() - START_TIME, message
    SERIALIZE_STDOUT.release()

def produce(work_queue, condition, timeout, message):
    """called to put a single item onto the work queue."""
    prio = time.time() + float(timeout)
    condition.acquire()
    heapq.heappush(work_queue, (prio, message))
    condition.notify()
    condition.release()

def worker(work_queue, condition):
    condition.acquire()
    stopped = False
    while not stopped:
        now = time.time()
        if work_queue:
            prio, data = work_queue[0]
            if data == 'stop':
                stopped = True
                continue
            if prio < now:
                heapq.heappop(work_queue)
                condition.release()
                # do some work!
                consumer(data)
                condition.acquire()
            else:
                condition.wait(prio - now)
        else:
            # the queue is empty, wait until notified
            condition.wait()
    condition.release()

if __name__ == '__main__':
    # first set up the work queue and worker pool
    work_queue = []
    cond = threading.Condition()
    pool = [threading.Thread(target=worker, args=(work_queue, cond))
            for _ignored in range(4)]
    map(threading.Thread.start, pool)

    # now add some work
    produce(work_queue, cond, 10, 'Grumpy')
    produce(work_queue, cond, 10, 'Sneezy')
    produce(work_queue, cond, 5, 'Happy')
    produce(work_queue, cond, 10, 'Dopey')
    produce(work_queue, cond, 15, 'Bashful')
    time.sleep(5)
    produce(work_queue, cond, 5, 'Sleepy')
    produce(work_queue, cond, 10, 'Doc')

    # and just to make the example a bit more friendly, tell the threads to stop after all
    # the work is done
    produce(work_queue, cond, float('inf'), 'stop')
    map(threading.Thread.join, pool)
于 2011-07-14T14:11:23.933 回答
11

这个答案实际上有两个建议——我的第一个和我在第一个之后发现的另一个。

预定的

我怀疑您正在寻找sched模块

编辑:我的建议在我阅读后似乎没什么帮助。所以我决定测试这个sched模块,看看它是否可以按照我的建议工作。我的测试来了:我会用一个单独的线程来使用它,或多或少这样:

class SchedulingThread(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)
        self.scheduler = sched.scheduler(time.time, time.sleep)
        self.queue = []
        self.queue_lock = threading.Lock()
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

    def run(self):
        self.scheduler.run()

    def schedule(self, function, delay):
        with self.queue_lock:
            self.queue.append((delay, 1, function, ()))

    def _schedule_in_scheduler(self):
        with self.queue_lock:
            for event in self.queue:
                self.scheduler.enter(*event)
                print "Registerd event", event
            self.queue = []
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

首先,我将创建一个线程类,它有自己的调度程序和队列。调度程序中至少会注册一个事件:一个用于调用从队列中调度事件的方法。

class SchedulingThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.scheduler = sched.scheduler(time.time, time.sleep)
        self.queue = []
        self.queue_lock = threading.Lock()
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

从队列中调度事件的方法将锁定队列,调度每个事件,清空队列并再次调度自身,以便在将来的某个时间寻找新的事件。请注意,寻找新事件的时间很短(一秒),您可以更改它:

    def _schedule_in_scheduler(self):
        with self.queue_lock:
            for event in self.queue:
                self.scheduler.enter(*event)
                print "Registerd event", event
            self.queue = []
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

该类还应该有一个调度用户事件的方法。自然,此方法应该在更新队列时锁定队列:

    def schedule(self, function, delay):
        with self.queue_lock:
            self.queue.append((delay, 1, function, ()))

最后,该类应该调用调度程序的 main 方法:

    def run(self):
        self.scheduler.run()

下面是一个使用示例:

def print_time():
    print "scheduled:", time.time()


if __name__ == "__main__":
    st = SchedulingThread()
    st.start()          
    st.schedule(print_time, 10)

    while True:
        print "main thread:", time.time()
        time.sleep(5)

    st.join()

它在我的机器上的输出是:

$ python schedthread.py
main thread: 1311089765.77
Registerd event (10, 1, <function print_time at 0x2f4bb0>, ())
main thread: 1311089770.77
main thread: 1311089775.77
scheduled: 1311089776.77
main thread: 1311089780.77
main thread: 1311089785.77

这段代码只是一个简单的例子,它可能需要一些工作。但是,我不得不承认我对这个sched模块有点着迷,所以我建议它。您可能还想寻找其他建议:)

AP调度器

在谷歌中寻找我发布的解决方案,我发现了这个惊人的 APScheduler 模块。它非常实用和有用,我敢打赌它您的解决方案。使用此模块,我之前的示例会更简单:

from apscheduler.scheduler import Scheduler
import time

sch = Scheduler()
sch.start()

@sch.interval_schedule(seconds=10)

def print_time():
    print "scheduled:", time.time()
    sch.unschedule_func(print_time)

while True:
    print "main thread:", time.time()
    time.sleep(5)

(不幸的是我没有找到如何安排一个事件只执行一次,所以函数事件应该自行取消安排。我敢打赌它可以用一些装饰器来解决。)

于 2011-07-19T15:00:47.370 回答
7

如果您有一堆需要稍后执行的任务,并且即使您关闭调用程序或您的工作人员,您也希望它们持续存在,您应该真正研究一下Celery,这使得创建新任务变得非常容易,有它们在您想要的任何机器上执行,然后等待结果。

来自 Celery 页面,“这是一个简单的添加两个数字的任务:”

from celery.task import task

@task
def add(x, y):
    return x + y

您可以在后台执行任务,或等待它完成:

>>> result = add.delay(8, 8)
>>> result.wait() # wait for and return the result
16
于 2011-07-25T23:41:01.573 回答
3

你写了:

问题之一是该进程使用 zeromq 进行通信,所以我需要一些集成(eventlet 已经拥有它)

似乎你的选择会受到这些细节的严重影响,这些细节有点不清楚——zeromq 是如何用于通信的,集成需要多少资源,以及你的要求和可用资源是什么。


有一个名为django-ztask的项目,它使用zeromq并提供了一个task类似于 celery 的装饰器。但是,它(显然)是特定于 Django 的,因此可能不适合您的情况。我没用过,我自己更喜欢芹菜

已经在几个项目中使用了 celery(这些项目托管在ep.io PaaS 托管,它提供了一种简单的使用方法)。

Celery 看起来是一个非常灵活的解决方案,允许延迟任务、回调、任务过期和重试、限制任务执行率等。它可以与 Redis、Beanstalk、CouchDB、MongoDB 或 SQL 数据库一起使用。

示例代码(定义任务和延迟后的异步执行):

from celery.decorators import task

@task
def my_task(arg1, arg2):
    pass # Do something

result = my_task.apply_async(
    args=[sth1, sth2], # Arguments that will be passed to `my_task()` function.
    countdown=3, # Time in seconds to wait before queueing the task.
)

另请参阅celery 文档中的部分

于 2011-07-26T09:44:51.803 回答
2

你看过multiprocessing模块吗?它是 Python 的标准配置。它类似于threading模块,但在一个进程中运行每个任务。您可以使用Pool()对象来设置工作池,然后使用该.map()方法调用具有各种排队任务参数的函数。

于 2011-07-20T02:54:11.793 回答
1

Pyzmq有一个ioloop与 tornado ioloop 类似的 api 实现。它实现了一个DelayedCallback可以帮助你的。

于 2011-07-24T10:03:39.067 回答
0

假设您的进程有一个可以接收信号的运行循环,并且每个动作的时间长度在顺序操作的范围内,请使用信号和 posix alarm()

    signal.alarm(time)
If time is non-zero, this function requests that a 
SIGALRM signal be sent to the process in time seconds. 

这取决于您所说的“那些“以后的”操作可能很多”以及您的流程是否已经使用信号。由于问题的措辞,不清楚为什么需要外部 python 包。

于 2011-07-20T22:17:07.323 回答
0

另一种选择是使用Phyton GLib 绑定​​,尤其是它的timeout功能。

只要你不想使用多核并且对GLib的依赖没有问题,这是一个不错的选择。它处理同一线程中的所有事件,从而防止同步问题。此外,它的事件框架还可以用来观察和处理基于 IO(即套接字)的事件。

更新:

这是使用 GLib 的实时会话:

>>> import time
>>> import glib
>>> 
>>> def workon(thing):
...     print("%s: working on %s" % (time.time(), thing))
...     return True # use True for repetitive and False for one-time tasks
... 
>>> ml = glib.MainLoop()
>>> 
>>> glib.timeout_add(1000, workon, "this")
2
>>> glib.timeout_add(2000, workon, "that")
3
>>> 
>>> ml.run()
1311343177.61: working on this
1311343178.61: working on that
1311343178.61: working on this
1311343179.61: working on this
1311343180.61: working on this
1311343180.61: working on that
1311343181.61: working on this
1311343182.61: working on this
1311343182.61: working on that
1311343183.61: working on this
于 2011-07-22T13:52:22.003 回答
0

那么在我看来,你可以使用一种叫做“合作多任务处理”的东西。这是基于扭曲的东西,它真的很酷。看看 2010 年的 PyCon 演示文稿:http: //blip.tv/pycon-us-videos-2009-2010-2011/pycon-2010-cooperative-multitasking-with-twisted-getting-things-done-concurrently-11- 3352182

好吧,您也需要传输队列来执行此操作...

于 2011-07-25T14:49:43.257 回答
0

简单的。您可以从 Thread 继承您的类并使用类似超时的参数创建您的类的实例,因此对于您的类的每个实例,您可以说超时,这将使您的线程等待那个时间

于 2011-07-26T10:36:14.510 回答