3

我做了某种迷你框架,以便能够在以更优雅的方式处理错误的celery情况下捕获连接错误,并且除了使用.rabbitmqsend_task

这是一些代码来阐明这个想法:

class MyBaseTask(base_task.Task):
    """ Base Class to handle tasks from Me hohoho!"""
    abstract = True

    @classmethod
    def delay(cls, *args, **kwargs):
        """Hook to catch connection errors"""
        try:
            return super(MyBaseTask, cls).apply_async(args, kwargs)
        except socket.error as e:
            cls._safe_failover()  # a function to handle this error
            cls.get_logger().error(str(e))
        except Exception as e:
            cls.get_logger().error("Uknown Error: %s" % str(e))
            raise  # normal exception

现在我继承MyBaseTask类:

class MyL33tTask(MyBaseTask):
    name = 'task.my_leet_task'

    def run(self, *args, **kwargs):
        # yada yada

当套接字错误发生时(AKA,当关闭时),它将执行 safe_failover 函数rabbitmq。可悲的是,当我使用它时不会发生这种情况,send_task('task.my_leet_task')因为它使用了某种MyBaseTask未加载的代理。

有没有一种简单的方法可以替代send_task使用MyBaseTask

4

1 回答 1

0

到目前为止,我了解您的逻辑的主要用途是确保您实际上正在向代理发送任务。

如果我的理解是正确的,那么您的方法可能是错误的,让我解释一下原因。使用消息传递时,主要优点是您可以通过将消息传递给代理来安排任务,方法 send_task 实际上他对任务本身一无所知,它只是为 Celery 编写格式良好的消息并发送它到配置的代理(http://docs.celeryproject.org/en/latest/faq.html#can-i-call-a-task-by-name)。

考虑到这一点,很明显应该在调用 *send_message* 的地方处理“发送消息失败”的异常捕获。延迟方法可以保持这种方式,但我建议更加明确并保留实际调用延迟()的捕获逻辑,因为如果任务未安排,该怎么办也不取决于任务,而是取决于调度器。

于 2014-01-03T10:00:12.113 回答