我的 tasks.py 文件中有三个 Celery @tasks,它们通常由不同的工作人员同时排队和处理,每个工作人员的处理时间相似。我相信我遇到的问题是他们都试图在其他用户配置文件对象完成之前更新同一个用户配置文件对象。似乎三个进程中最后一个完成的是成功写入数据库的进程。如果我在任务之间间隔几秒钟运行这些,一切都很好。
知道问题出在哪里,或者有什么方法可以继续尝试保存到配置文件,直到它真正起作用?
在此先感谢您的帮助!
我的 tasks.py 文件中有三个 Celery @tasks,它们通常由不同的工作人员同时排队和处理,每个工作人员的处理时间相似。我相信我遇到的问题是他们都试图在其他用户配置文件对象完成之前更新同一个用户配置文件对象。似乎三个进程中最后一个完成的是成功写入数据库的进程。如果我在任务之间间隔几秒钟运行这些,一切都很好。
知道问题出在哪里,或者有什么方法可以继续尝试保存到配置文件,直到它真正起作用?
在此先感谢您的帮助!
我假设您正在使用 django,因为您将其标记为这样。如果是这样,您可以使用 select_for_update (文档) 来锁定对象。这将阻塞其他工作人员,直到事务完成。如果您的任务运行很长时间,您可能会超时,因此请捕获该异常并在必要时重试。
from django.db import transaction
from celery.task import task
@task
def mytask(mpk):
with transaction.commit_on_success():
my_obj = MyModel.objects.select_for_update().get(pk=mpk)
...
请注意,这不适用于 sqlite。
Django ORM 可以在这里发挥作用。如果您使用model_object.save()
方法,它会更新所有字段。如果您的任务正在更新同一对象中的不同字段,您可以考虑使用ModelClass.objects.filter(pk=model_id).update(some_field=some_value)
,但在这里您可能会陷入不同的 RDBMS 如何实现表/行锁定。
另一种选择是使用Celery Chord并在完成所有获取用户数据的任务后更新用户配置文件。您可能需要实现分布式信号量,因此唯一的和弦任务将同时为同一个用户配置文件执行。
看起来它更像是一个数据库锁定问题。您是否尝试过编辑配置文件并在数据库上允许更多并发?例如在 Postgre Debian 上编辑您的 conf 文件:
nano /etc/postgresql/9.4/main/postgresql.conf
然后你可以在 conf 文件中设置如下内容:
max_connections=100
shared_buffers = 3000MB
temp_buffers = 800MB
effective_io_concurrency = 5
max_worker_processes = 15
这应该允许您在描述时读/写。