3

我有一个 celery 任务,当被调用时,它会简单地在扭曲的反应器中启动一些并行代码的执行。这是一些示例(不可运行)代码来说明:

def run_task_in_reactor():
   # this takes a while to run
   do_something()
   do_something_more()


@celery.task
def run_task():
   print "Started reactor"
   reactor.callFromThread(run_task_in_reactor)

(为简单起见,请假设worker收到任务时reactor已经在运行;我使用信号@worker_process_init.connect在worker一上来就在另一个线程中启动我的reactor)

当我调用run_task.delay()时,任务很快完成(因为它不等待run_task_in_reactor()完成,只安排在反应器中执行)。并且,当run_task_in_reactor()finally 运行时,do_something()或者do_something_more()可以抛出异常,这将被忽视。

例如,使用pika从我的队列中消费,我可以使用内部的 ACKdo_something_more()来使工作人员通知任务的正确完成。但是,在 Celery 内部,这似乎是不可能的(或者,至少,我不知道如何实现相同的效果)

此外,我无法移除反应堆,因为这是我正在使用的某些第三方代码的要求。实现相同结果的其他方法也值得赞赏。

4

1 回答 1

0

改为使用reactor.blockingCallFromThread

于 2015-05-18T23:55:32.453 回答