3

我正在尝试让发件人过滤器工作,例如

@celery.task
def run_timer(crawl_start_time):
    return crawl_start_time

@task_success.connect
def run_timer_success_handler(sender, result, **kwargs):

    print '##################################'
    print 'in run_timer_success_handler'

以上工作正常,但如果我尝试按发件人过滤,它永远不会工作:

@task_success.connect(sender='tasks.run_timer')
def run_timer_success_handler(sender, result, **kwargs):

    print '##################################'
    print 'in run_timer_success_handler'

我也试过:@task_success.connect(sender='run_timer') @task_success.connect(sender=run_timer) @task_success.connect(sender=globals()['run_timer'])

它们都不起作用。

我如何有效地使用发送者过滤器来确保为 run_timer 任务而不是其他任务调用回调。

4

2 回答 2

4

在这种情况下,现在最好在函数内部过滤发件人。喜欢:

@task_success.connect
def ...
    if sender == '...':
        ...

因为当任务发送者和工作者是不同的python进程时,当前的芹菜信号实现存在问题。因为它将您的发件人转换为标识符并使​​用它进行过滤,但 celery 通过字符串名称发送任务。这是问题代码(celery.utils.dispatch.signals):

def _make_id(target):  # pragma: no cover
    if hasattr(target, 'im_func'):
        return (id(target.im_self), id(target.im_func))
    return id(target)

并且 id('tasks.run_timer') 与工作进程的 id('tasks.run_timer') 不同。如果你愿意,你可以破解它并用哈希函数替换id

于 2013-09-25T16:19:09.207 回答
4

http://docs.celeryproject.org/en/latest/userguide/signals.html#task-success ...发件人是执行的任务对象。(与 after_task_publish.sender 不同)......所以,你应该

@task_success.connect(sender=run_timer)
def ...

这个对我有用。祝你好运。

于 2016-04-19T02:35:38.493 回答