2

我在 Python 中使用celery模块 v3.1.25运行 Celery 工作程序,并在 node.js 中使用node-celerynpm 包v0.2.7 (not the latest)运行 Celery 客户端。

当使用 Python Celery 客户端发送作业时,Python Celery worker 工作正常。

问题:使用node-celery客户端向Celery后端发送任务时,在JS控制台报错:

(STDERR) Celery 应该配置 json 序列化器

Python Celery worker 配置有:

app = Celery('tasks', 
    broker='amqp://test:test@192.168.1.26:5672//',
    backend='amqp://',
    task_serializer='json',
    include=['proj.tasks'])

node-celery客户端配置:

var celery = require('node-celery')
var client = celery.createClient({
    CELERY_BROKER_URL: 'amqp://test:test@192.168.1.26:5672//',
    CELERY_RESULT_BACKEND: 'amqp',
    CELERY_TASK_SERIALIZER: "json"
});

client.on('connect', function() {
    console.log('connected');

    client.call('proj.tasks.getPriceEstimates', [start_latitude, start_longitude],
        function(result) {
            console.log('result: ', result);
            client.end();
        })
});

这是 Python Celery worker 上的配置问题吗?我们是否错过了可以将返回序列化格式更改为的配置参数json


更新

根据 ChillarAnand 的建议使用result_serializers和参数更新accept_content

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('tasks', 
    broker='amqp://test:test@192.168.1.26:5672//',
    backend='amqp://',
    task_serializer='json',
    result_serializer='json',
    accept_content=['application/json'],
    include=['proj.tasks'])

但是 node.js Celery 客户端仍然认为它不在json,抛出相同的错误消息。

它给出了这个错误,因为结果是'application/x-python-serialize'.

检查是这种情况,因为 RabbitMQ 管理控制台显示结果是content_type: application/x-python-serialize


这个论坛帖子说这是因为任务是在加载配置之前创建的。

这是我的文件的样子:

项目/芹菜.py

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('tasks', 
    broker='amqp://test:test@192.168.1.26:5672//',
    backend='amqp://',
    task_serializer='json',
    result_serializer='json',
    accept_content=['application/json'],
    include=['proj.tasks'])

项目/任务.py

from __future__ import absolute_import, unicode_literals
from .celery import app

@app.task
def myTask():
    ...
    return ...

有没有更好的方法来构建代码以确保在任务之前加载配置?

4

1 回答 1

1

配置序列化器时,您还应该指定内容类型、任务序列化器和结果序列化器。

app = Celery(
    broker='amqp://guest@localhost//',
    backend='amqp://',
    include=['proj.tasks'],

    task_serializer='json',
    result_serializer='json',
    accept_content = ['application/json'],
)
于 2017-05-02T05:18:18.677 回答