0

我有以下使用 Jython 调用 celery.send_task() 方法的 Java 代码:

import org.python.util.PythonInterpreter;

public class SendTask {

    static final String SEND_TASK_PY =
            "from celery import Celery\n"+
            "c = Celery('tasks', broker='amqp://', backend='amqp://')\n"+
            "c.send_task('tasks.add', args=[2, 2], kwargs={})";

    public static void main(String[] args) {
        PythonInterpreter interpreter = new PythonInterpreter();
        interpreter.exec(SEND_TASK_PY);
    }
}

导致以下错误:

mvn clean package
java -cp target/jython-celery-0.1-SNAPSHOT-jar-with-dependencies.jar  SendTask

Exception in thread "MainThread" Traceback (most recent call last):
  File "<string>", line 3, in <module>
  File "/home/itaif/dev/jython-celery/target/jython-celery-0.1-SNAPSHOT-jar-with-dependencies.jar/Lib/celery/app/base.py", line 245, in send_task
  File "/home/itaif/dev/jython-celery/target/jython-celery-0.1-SNAPSHOT-jar-with-dependencies.jar/Lib/celery/app/amqp.py", line 242, in publish_task
  File "/home/itaif/dev/jython-celery/target/jython-celery-0.1-SNAPSHOT-jar-with-dependencies.jar/Lib/kombu/messaging.py", line 155, in publish
  File "/home/itaif/dev/jython-celery/target/jython-celery-0.1-SNAPSHOT-jar-with-dependencies.jar/Lib/kombu/messaging.py", line 232, in _prepare
  File "/home/itaif/dev/jython-celery/target/jython-celery-0.1-SNAPSHOT-jar-with-dependencies.jar/Lib/kombu/serialization.py", line 170, in encode
  File "/home/itaif/dev/jython-celery/target/jython-celery-0.1-SNAPSHOT-jar-with-dependencies.jar/Lib/kombu/serialization.py", line 356, in dumps
TypeError: dumps(): takes no keyword arguments        

作为参考,这里是创建 jar 文件的 maven pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>jython-celery</groupId>
<artifactId>jython-celery</artifactId>
<packaging>jar</packaging>
<version>0.1-SNAPSHOT</version>

<dependencies>
    <dependency>
        <groupId>org.python</groupId>
        <artifactId>jython-standalone</artifactId>
        <version>2.7-b1</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>net.sf.mavenjython</groupId>
            <artifactId>jython-compile-maven-plugin</artifactId>
            <version>1.2</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>jython</goal>
                    </goals>
                </execution>
            </executions>
            <configuration>
                <libraries>
                    <param>Celery</param>
                </libraries>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <executions>
                <execution>
                    <id>package-jar-with-dependencies</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                        <archive>
                            <manifest>
                                <mainClass>SendTask</mainClass>
                                <addClasspath>true</addClasspath>
                            </manifest>
                        </archive>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

4

1 回答 1

0

错误指示的问题在 msgpack 序列化程序中。通过将序列化程序更改为 JSON,问题就消失了。

导入 org.python.util.PythonInterpreter;

公共类发送任务{

static final String SEND_TASK_PY =
        "from celery import Celery\n"+
        "c = Celery('tasks', broker='amqp://', backend='amqp://')\n"+
        "c.conf.CELERY_TASK_SERIALIZER='json'\n"+
        "result = c.send_task('tasks.add', args=[2, 2], kwargs={}).get()";

public static void main(String[] args) {
    PythonInterpreter interpreter = new PythonInterpreter();
    interpreter.exec(SEND_TASK_PY);
    System.out.println(interpreter.get("result"));
} }
于 2013-06-09T08:49:57.637 回答