1

我在两台不同的机器上运行 celery client(Flask) 和 worker,现在一旦 worker 完成任务,我需要在客户端回调一个函数。这可能吗?

芹菜客户:-

celery_app=Celery('test_multihost', broker='amqp://test:test@<worker_ip>/test_host', backend='rpc')
result= testMethod1.apply_async((param1, param2,param3), link=testMethod2.s())

@celery_app.task
def testMethod2():
    #testMethod2 body.

芹菜工人:-

celery_app=Celery('test_multihost', broker='amqp://test:test@<worker_ip>/test_host', backend='rpc')
@celery_app.task
def testMethod1():
   #testMethod1 body

但问题是函数 testMethod2 在 celery worker 端执行,而不是在客户端。

无论如何我可以在客户端回调该方法吗?

4

2 回答 2

0

我使用 celery 信号中的@after_task_publish 解决了这个问题。代码片段如下:-

@after_task_publish.connect(sender=<registered_celery_task>)
def testMethod2(sender=None, headers=None, body=None, **kwargs):
    #callback body

celery worker 在远程机器上完成后,将调用 testMethod2。在这里,我可以使用headers参数访问 celery worker 的结果。

于 2020-08-18T10:49:09.687 回答
0

一种方法是让 Celery 将其结果写入数据库表中,并使用 Flask 通过重复查询数据库来轮询任务的结果。类似的结构可能是在 Redis 中保存已完成任务的寄存器,但要点是相同的。

您想向用户触发完成消息吗?如果您可以通过电子邮件/短信通知,您当然可以让 Celery 处理。

如果你需要启动一些 Flask 进程——并且出于某种原因它确实需要在 Flask 的生态系统中——使用带有requests模块的 worker 来调用 Flask 正在监听的端点。

于 2020-08-14T13:08:46.343 回答