0

普通功能可以作为 django 管理操作执行。我想将数据导出为 csv 文件。由于数据的大小,我试图将其作为芹菜任务执行。但是模型、请求、查询集等对象不能传递给任务。有什么方法可以将管理操作作为 celery 任务执行。

4

1 回答 1

1

从 celery 任务或任何地方(例如管理命令)执行管理操作:

from celery import shared_task
from django.contrib import admin
from django.test.client import RequestFactory
from django.contrib.auth.models import User

@shared_task
def my_task(pk_of_model):
    '''
    Task executes a delete_selected admin action.
    '''

    # the queryset is the set of objects selected from the change list
    queryset = MyModel.objects.filter(pk=pk_of_model)

    # we use the django request factory to create a bogus request
    rf = RequestFactory()

    # the post data must reflect as if a user selected the action
    # below we use a 'delete' action and specify post:'post' to
    # simulate the user confirmed the delete

    request = rf.post(
        '/admin/app/model',   # url of the admin change list
        {
            '_selected_action': [m.pk for m in queryset],
            'action': 'delete_selected',
            'post': 'post', 
        }
    )

    # the request factory does not use any middlewares so we add our
    # system user - some admin user all the tasks and commands run as.
    request.user = User.objects.get(username='SYSTEM') # must exist

    # the admin site registry holds all the ModelAdmin
    # instances where our actions are declared
    admin.site._registry[MyModel].delete_selected(request, queryset)

上面的示例将失败,因为该delete_selected操作依赖于messages中间件并且请求工厂不使用任何中间件。可以将最后的执行行包装在 a 中,try: ... except MessageFailure: pass但很可能您将执行自己的自定义操作,您可以在其中检查消息中间件是否已启用。

于 2015-06-22T20:10:47.170 回答