3

我正在使用DjangoCeleryRabbitMQ。我有一个发送电子邮件的简单任务。此任务有效,但速度很慢。

例如,我发送了5000 封电子邮件,所有 5000 封电子邮件都像往常一样直接发送到RabbitMQ,但是一旦进入消息代理,它就需要大约 30 分钟才能完成并清除所有任务。

如果没有 Celery,这些相同的任务只需几分钟即可处理所有 5000 个任务。

我错过了配置的东西吗?如果有人能发现我的速度问题,那将非常有帮助。

任务.py

class SendMessage(Task):
    name = "Sending SMS"
    max_retries = 10
    default_retry_delay = 3

    def run(self, message_id, gateway_id=None, **kwargs):
        logging.debug("About to send a message.")


        try:
            message = Message.objects.get(pk=message_id)
        except Exception as exc:
            raise SendMessage.retry(exc=exc)

        if not gateway_id:
            if hasattr(message.billee, 'sms_gateway'):
                gateway = message.billee.sms_gateway
            else:
                gateway = Gateway.objects.all()[0]
        else:
            gateway = Gateway.objects.get(pk=gateway_id)

        account = Account.objects.get(user=message.sender)
        if account._balance() >= message.length:
            response = gateway._send(message)

            if response.status == 'Sent':
                # Take credit from users account.
                transaction = Transaction(
                    account=account,
                    amount=- message.charge,

                )
                transaction.save()
                message.billed = True
                message.save()
        else:
            pass

设置.py

# Celery
BROKER_URL = 'amqp://admin:xxxxxx@xx.xxx.xxx.xxx:5672//'
CELERY_SEND_TASK_ERROR_EMAILS = True

阿帕奇配置

<VirtualHost *:80>
ServerName www.domain.com

DocumentRoot /srv/project/domain


WSGIDaemonProcess domain.com processes=2 threads=15 display-name=%{GROUP}
WSGIProcessGroup domain.com

WSGIScriptAlias / /srv/project/domain/apache/django.wsgi
ErrorLog /srv/project/logs/error.log
</VirtualHost>

会议

# Name of nodes to start, here we have a single node
#CELERYD_NODES="w1"
# or we could have three nodes:
CELERYD_NODES="w1 w2 w3"

# Where to chdir at start.
CELERYD_CHDIR="/srv/project/domain"

# How to call "manage.py celeryd_multi"
CELERYD_MULTI="$CELERYD_CHDIR/manage.py celeryd_multi"

# How to call "manage.py celeryctl"
CELERYCTL="$CELERYD_CHDIR/manage.py celeryctl"

# Extra arguments to celeryd
CELERYD_OPTS="--time-limit=900 --concurrency=8"

# %n will be replaced with the nodename.
CELERYD_LOG_FILE="/srv/project/logs/celery/%n.log"
CELERYD_PID_FILE="/srv/project/celery/%n.pid"

# Workers should run as an unprivileged user.
CELERYD_USER="root"
CELERYD_GROUP="root"

# Name of the projects settings module.
export DJANGO_SETTINGS_MODULE="domain.settings"

# Celery Beat Settings.

# Where to chdir at start.
CELERYBEAT_CHDIR="/srv/project/domain"

# Path to celerybeat
CELERYBEAT="$CELERYBEAT_CHDIR/manage.py celerybeat"
4

1 回答 1

9

您正在处理约 2.78 个任务/秒(5000 个任务在 30 分钟内),我同意这并不高。您有 3 个节点,每个节点以 8 个并发运行,因此您应该能够并行处理 24 个任务。

检查事项:

CELERYD_PREFETCH_MULTIPLIER- 默认情况下设置为 4,但如果您有很多短任务,则值得增加它。它将减少从代理获取消息的时间的影响,代价是任务不会在工作人员之间均匀分布。

数据库连接/查询 - 我计算成功案例执行了 5+ 数据库查询。如果您使用 django-celery 的默认结果后端,则有额外的查询用于将任务结果存储在数据库中。django-celery 还将在每个任务之后关闭并重新打开数据库连接,这会增加一些开销。如果您有 5 个查询并且每个查询需要 100 毫秒,那么无论有没有 celery,您的任务至少需要 500 毫秒。自己运行查询是一回事,但您还需要确保您的任务中没有任何内容会锁定表/行,从而阻止其他任务有效地并行运行。

网关响应时间 - 您的任务似乎调用了一个远程服务,我假设它是一个 SMS 网关。如果该服务器响应缓慢,那么您的任务将很慢。同样,单个呼叫的响应时间与您在峰值负载下执行此操作时的响应时间可能不同。在美国,长代码 SMS 只能以每秒 1 条的速率发送,并且取决于网关在哪里进行速率限制,那么它可能会减慢您的任务。

于 2013-08-20T17:06:34.020 回答