14

In Celery, you can retry any task in case of exception. You can do it like so:

@task(max_retries=5)
def div(a, b):
    try:
        return a / b
    except ZeroDivisionError, exc:
        raise div.retry(exc=exc)

In this case, if you want to to divide by zero, task will be retied five times. But you have to check for errors in you code explicitly. Task will not be retied if you skip try-except block.

I want my functions to look like:

@celery.task(autoretry_on=ZeroDivisionError, max_retries=5)
def div(a, b):
    return a / b
4

4 回答 4

12

Celery (since version 4.0) has exactly what you were looking for:

@app.task(autoretry_for=(SomeException,))
def my_task():
    ...

See: http://docs.celeryproject.org/en/latest/userguide/tasks.html#automatic-retry-for-known-exceptions

于 2017-04-06T14:43:32.967 回答
10

I searched this issue for a while, but found only this feature request.

I decide to write my own decorator for doing auto-retries:

def task_autoretry(*args_task, **kwargs_task):
    def real_decorator(func):
        @task(*args_task, **kwargs_task)
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            try:
                func(*args, **kwargs)
            except kwargs_task.get('autoretry_on', Exception), exc:
                wrapper.retry(exc=exc)
        return wrapper
    return real_decorator

With this decorator I can rewriter my previous task:

@task_autoretry(autoretry_on=ZeroDivisionError, max_retries=5)
def div(a, b):
    return a / b
于 2013-07-31T19:52:43.890 回答
2

I've modified your answer to work with the existing Celery API (currently 3.1.17)

class MyCelery(Celery):
    def task(self, *args_task, **opts_task):
        def real_decorator(func):
            sup = super(MyCelery, self).task

            @sup(*args_task, **opts_task)
            @functools.wraps(func)
            def wrapper(*args, **kwargs):
                try:
                    func(*args, **kwargs)
                except opts_task.get('autoretry_on', Exception) as exc:
                    logger.info('Yo! We did it!')
                    wrapper.retry(exc=exc, args=args, kwargs=kwargs)
            return wrapper
        return real_decorator

Then, in your tasks

app = MyCelery()
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

@app.task(autoretry_on=Exception)
def mytask():
    raise Exception('Retrying!')

This allows you to add the autoretry_on functionality to your tasks without having to use a separate decorator to define tasks.

于 2015-03-18T14:14:41.287 回答
0

Here is an improved version of the existing answers.

This fully implements the Celery 4.2 behaviour (as documented here) but for Celery 3.1.25.

It also doesn't break the different task decorator forms (with/without parentheses) and returns/raises properly.

import functools
import random

from celery.app.base import Celery as BaseCelery


def get_exponential_backoff_interval(factor, retries, maximum, full_jitter=False):
    """
    Calculate the exponential backoff wait time.

    (taken from Celery 4 `celery/utils/time.py`)
    """
    # Will be zero if factor equals 0
    countdown = factor * (2 ** retries)
    # Full jitter according to
    # https://www.awsarchitectureblog.com/2015/03/backoff.html
    if full_jitter:
        countdown = random.randrange(countdown + 1)
    # Adjust according to maximum wait time and account for negative values.
    return max(0, min(maximum, countdown))


class Celery(BaseCelery):

    def task(self, *args, **opts):
        """
        Overridden to add a back-port of Celery 4's `autoretry_for` task args.
        """
        super_method = super(Celery, self).task

        def inner_create_task_cls(*args_task, **opts_task):
            # http://docs.celeryproject.org/en/latest/userguide/tasks.html#Task.autoretry_for
            autoretry_for = tuple(opts_task.get('autoretry_for', ()))  # Tuple[Type[Exception], ...]
            retry_backoff = int(opts_task.get('retry_backoff', False))  # multiplier, default if True: 1
            retry_backoff_max = int(opts_task.get('retry_backoff_max', 600))  # seconds
            retry_jitter = opts_task.get('retry_jitter', True)  # bool
            retry_kwargs = opts_task.get('retry_kwargs', {})

            def real_decorator(func):
                @super_method(*args_task, **opts_task)
                @functools.wraps(func)
                def wrapper(*func_args, **func_kwargs):
                    try:
                        return func(*func_args, **func_kwargs)
                    except autoretry_for as exc:
                        if retry_backoff:
                            retry_kwargs['countdown'] = get_exponential_backoff_interval(
                                factor=retry_backoff,
                                retries=wrapper.request.retries,
                                maximum=retry_backoff_max,
                                full_jitter=retry_jitter,
                            )
                        raise wrapper.retry(exc=exc, **retry_kwargs)
                return wrapper
            return real_decorator

        # handle both `@task` and `@task(...)` decorator forms
        if len(args) == 1:
            if callable(args[0]):
                return inner_create_task_cls(**opts)(*args)
            raise TypeError('argument 1 to @task() must be a callable')
        if args:
            raise TypeError(
                '@task() takes exactly 1 argument ({0} given)'.format(
                    sum([len(args), len(opts)])))
        return inner_create_task_cls(**opts)

I have also written some unit tests for this as am using it in my project.

They can be found in this gist but note they are not easily runnable - treat more as documentation of how the above feature works (and validation that it works properly).

于 2018-08-15T14:15:39.317 回答