10

我有 celery beat 和 celery(四名工人)批量做一些加工步骤。其中一项任务大致类似于“对于每个尚未创建 Y 的 X,创建一个 Y”。

该任务以半快速(10 秒)的速度定期运行。任务完成得非常快。还有其他任务正在进行。

我多次遇到过节拍任务明显积压的问题,因此同时执行相同的任务(来自不同的节拍时间),导致错误地重复工作。似乎任务是乱序执行的。

  1. 是否可以限制 celery beat 以确保一次只有一个未完成的任务实例?在任务上设置类似rate_limit=5的“正确”方法吗?

  2. 是否可以确保按顺序执行 beat 任务,例如,不是分派任务,而是 beat 将其添加到任务链中?

  3. 除了使这些任务本身以原子方式执行并且可以安全地同时执行之外,最好的处理方法是什么?这不是我所期望的击败任务的限制……</p>

任务本身的定义很天真:

@periodic_task(run_every=timedelta(seconds=10))
def add_y_to_xs():
    # Do things in a database
    return

这是一个实际的(清理过的)日志:

  • [00:00.000]foocorp.tasks.add_y_to_xs 已发送。id->#1
  • [00:00.001]收到任务:foocorp.tasks.add_y_to_xs[#1]
  • [00:10.009]foocorp.tasks.add_y_to_xs 已发送。id->#2
  • [00:20.024]foocorp.tasks.add_y_to_xs 已发送。id->#3
  • [00:26.747]收到任务:foocorp.tasks.add_y_to_xs[#2]
  • [00:26.748]任务池:应用 #2
  • [00:26.752]收到任务:foocorp.tasks.add_y_to_xs[#3]
  • [00:26.769]接受的任务:foocorp.tasks.add_y_to_xs[#2] pid:26528
  • [00:26.775]任务 foocorp.tasks.add_y_to_xs[#2] 在 0.0197986490093s 内成功:无
  • [00:26.806]任务池:应用 #1
  • [00:26.836]任务池:应用 #3
  • [01:30.020]接受的任务:foocorp.tasks.add_y_to_xs[#1] pid:26526
  • [01:30.053]接受的任务:foocorp.tasks.add_y_to_xs[#3] pid:26529
  • [01:30.055]foocorp.tasks.add_y_to_xs[#1]:为 X id 添加 Y #9725
  • [01:30.070]foocorp.tasks.add_y_to_xs[#3]:为 X id 添加 Y #9725
  • [01:30.074]任务 foocorp.tasks.add_y_to_xs[#1] 在 0.0594762689434s 内成功:无
  • [01:30.087]任务 foocorp.tasks.add_y_to_xs[#3] 在 0.0352867960464s 内成功:无

我们目前使用 Celery 3.1.4 和 RabbitMQ 作为传输。

编辑丹,这是我想出的:

丹,这就是我最终使用的:

from sqlalchemy import func
from sqlalchemy.exc import DBAPIError
from contextlib import contextmanager


def _psql_advisory_lock_blocking(conn, lock_id, shared, timeout):
    lock_fn = (func.pg_advisory_xact_lock_shared
               if shared else
               func.pg_advisory_xact_lock)
    if timeout:
        conn.execute(text('SET statement_timeout TO :timeout'),
                     timeout=timeout)
    try:
        conn.execute(select([lock_fn(lock_id)]))
    except DBAPIError:
        return False
    return True


def _psql_advisory_lock_nonblocking(conn, lock_id, shared):
    lock_fn = (func.pg_try_advisory_xact_lock_shared
               if shared else
               func.pg_try_advisory_xact_lock)
    return conn.execute(select([lock_fn(lock_id)])).scalar()


class DatabaseLockFailed(Exception):
    pass


@contextmanager
def db_lock(engine, name, shared=False, block=True, timeout=None):
    """
    Context manager which acquires a PSQL advisory transaction lock with a
    specified name.
    """
    lock_id = hash(name)

    with engine.begin() as conn, conn.begin():
        if block:
            locked = _psql_advisory_lock_blocking(conn, lock_id, shared,
                                                  timeout)
        else:
            locked = _psql_advisory_lock_nonblocking(conn, lock_id, shared)
        if not locked:
            raise DatabaseLockFailed()
        yield

以及 celery 任务装饰器(仅用于周期性任务):

from functools import wraps
from preo.extensions import db


def locked(name=None, block=True, timeout='1s'):
    """
    Using a PostgreSQL advisory transaction lock, only runs this task if the
    lock is available. Otherwise logs a message and returns `None`.
    """
    def with_task(fn):
        lock_id = name or 'celery:{}.{}'.format(fn.__module__, fn.__name__)

        @wraps(fn)
        def f(*args, **kwargs):
            try:
                with db_lock(db.engine, name=lock_id, block=block,
                             timeout=timeout):
                    return fn(*args, **kwargs)
            except DatabaseLockFailed:
                logger.error('Failed to get lock.')
                return None
        return f
    return with_task
4

5 回答 5

12
from functools import wraps
from celery import shared_task


def skip_if_running(f):
    task_name = f'{f.__module__}.{f.__name__}'

    @wraps(f)
    def wrapped(self, *args, **kwargs):
        workers = self.app.control.inspect().active()

        for worker, tasks in workers.items():
            for task in tasks:
                if (task_name == task['name'] and
                        tuple(args) == tuple(task['args']) and
                        kwargs == task['kwargs'] and
                        self.request.id != task['id']):
                    print(f'task {task_name} ({args}, {kwargs}) is running on {worker}, skipping')

                    return None

        return f(self, *args, **kwargs)

    return wrapped


@shared_task(bind=True)
@skip_if_running
def test_single_task(self):
    pass


test_single_task.delay()
于 2016-02-04T09:41:51.817 回答
7

做到这一点的唯一方法是自己实施锁定策略

阅读此处的部分以供参考。

与 cron 一样,如果第一个任务在下一个任务之前没有完成,任务可能会重叠。如果这是一个问题,您应该使用锁定策略来确保一次只能运行一个实例(参见示例确保一次只执行一个任务)。

于 2014-01-03T01:22:14.673 回答
3

我使用celery-once解决了这个问题,我将其扩展到celery-one

两者都为您的问题服务。它使用 Redis 锁定正在运行的任务。celery-one还将跟踪锁定的任务。

下面是一个非常简单的 celery beat 用法示例。在下面的代码中,slow_task每 1 秒安排一次,但它的完成时间是 5 秒。即使它已经在运行,普通的 celery 也会每秒安排一次任务。celery-one会阻止这种情况。

celery = Celery('test')
celery.conf.ONE_REDIS_URL = REDIS_URL
celery.conf.ONE_DEFAULT_TIMEOUT = 60 * 60
celery.conf.BROKER_URL = REDIS_URL
celery.conf.CELERY_RESULT_BACKEND = REDIS_URL

from datetime import timedelta

celery.conf.CELERYBEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'tasks.slow_task',
        'schedule': timedelta(seconds=1),
        'args': (1,)
    },
}

celery.conf.CELERY_TIMEZONE = 'UTC'


@celery.task(base=QueueOne, one_options={'fail': False})
def slow_task(a):
    print("Running")
    sleep(5)
    return "Done " + str(a)
于 2016-01-08T16:16:39.960 回答
2

我尝试编写一个装饰器以使用类似于 erydo 在他的评论中提到的Postgres 咨询锁定。

它不是很漂亮,但似乎工作正常。这是 Python 2.7 下的 SQLAlchemy 0.9.7。

from functools import wraps
from sqlalchemy import select, func

from my_db_module import Session # SQLAlchemy ORM scoped_session

def pg_locked(key):
    def decorator(f):
        @wraps(f)
        def wrapped(*args, **kw):
            session = db.Session()
            try:
                acquired, = session.execute(select([func.pg_try_advisory_lock(key)])).fetchone()
                if acquired:
                    return f(*args, **kw)
            finally:
                if acquired:
                    session.execute(select([func.pg_advisory_unlock(key)]))
        return wrapped
    return decorator

@app.task
@pg_locked(0xdeadbeef)
def singleton_task():
    # only 1x this task can run at a time
    pass

(欢迎对改进方法提出任何意见!)

于 2014-08-26T20:48:10.627 回答
1

需要分布式锁定系统,因为这些 Celery beat 实例本质上是不同的进程,可能跨不同的主机。

ZooKeeper 和 etcd 等中心坐标系适用于分布式锁定系统的实现。

我推荐使用轻量且快速的 etcd。对 etcd 的锁定有几种实现,例如:

python-etcd-锁

于 2016-01-10T18:26:12.963 回答