我有一个 celery 任务,当被调用时,它会简单地在扭曲的反应器中启动一些并行代码的执行。这是一些示例(不可运行)代码来说明:
def run_task_in_reactor():
# this takes a while to run
do_something()
do_something_more()
@celery.task
def run_task():
print "Started reactor"
reactor.callFromThread(run_task_in_reactor)
(为简单起见,请假设worker收到任务时reactor已经在运行;我使用信号@worker_process_init.connect
在worker一上来就在另一个线程中启动我的reactor)
当我调用run_task.delay()
时,任务很快完成(因为它不等待run_task_in_reactor()
完成,只安排在反应器中执行)。并且,当run_task_in_reactor()
finally 运行时,do_something()
或者do_something_more()
可以抛出异常,这将被忽视。
例如,使用pika
从我的队列中消费,我可以使用内部的 ACKdo_something_more()
来使工作人员通知任务的正确完成。但是,在 Celery 内部,这似乎是不可能的(或者,至少,我不知道如何实现相同的效果)
此外,我无法移除反应堆,因为这是我正在使用的某些第三方代码的要求。实现相同结果的其他方法也值得赞赏。