6

我在 3 个集群机器上设置了 celery + rabbitmq。我还创建了一个任务,它根据文件中的数据生成正则表达式并使用该信息来解析文本。

from celery import Celery

celery = Celery('tasks', broker='amqp://localhost//')
import re

@celery.task
def add(x, y):
     return x + y


def get_regular_expression():
    with open("text") as fp:
        data = fp.readlines()
    str_re = "|".join([x.split()[2] for x in data ])
    return str_re    



@celery.task
def analyse_json(tw):
    str_re = get_regular_expression()
    re.match(str_re,tw.text) 

我可以使用以下 python 代码非常轻松地调用此任务:-

from tasks import analyse_tweet_json
x = tweet ## load from a file (x is a json)
analyse_tweet_json.delay(x) 

但是,现在我想从 Java 而不是 python 进行相同的调用。我不确定做同样事情的最简单方法是什么。

我编写了这段代码来向 AMQP 代理发送消息。代码运行良好,但任务没有执行。我不确定如何指定应该执行的任务的名称。

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

class try1 {
public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "celery", "celery");
    String messageBody = "{\"text\":\"i am good\"}" ;
    byte[] msgBytes = messageBody.getBytes("ASCII") ;
    channel.basicPublish(queueName, queueName,
            new AMQP.BasicProperties
            ("application/json", null, null, null,
                    null, null, null, null,
                    null, null, null, "guest",
                    null, null),messageBody.getBytes("ASCII")) ;
    connection.close();    

} }

这是 rabbitMq 错误日志中的输出:-

connection <0.14627.0>, channel 1 - error:
{amqp_error,not_found,
"no exchange 'amq.gen-gEV47GX9pF_oZ-0bEnOazE' in vhost '/'",
'basic.publish'}

任何帮助将不胜感激。

谢谢,阿米特

4

2 回答 2

7

有几个问题。

1) String queueName = channel.queueDeclare().getQueue() 命令返回错误的队列名称。我将队列名称更改为“芹菜”,效果很好。2)json的格式必须是这种类型:- {“id”:“4cc7438e-afd4-4f8f-a2f3-f46567e7ca77”,“task”:“celery.task.PingTask”,“args”:[], “kwargs”:{},“重试”:0,“eta”:“2009-11-17T12:30:56.527191”}

http://docs.celeryproject.org/en/latest/internals/protocol.html中所示

经过这两次更改后,它运行良好。

-阿米特

于 2014-01-03T20:54:08.460 回答
0

celery 隐式声明一个交换,使用 Java 你必须自己声明一个。

请参阅从 Java 与 Django/Celery 互操作

于 2013-12-31T02:55:01.717 回答