1

我正在尝试使用以下 lib 版本从“scratch”(读取,“noob”)组合我的第一个 Akka/Camel 应用程序:

  • 阿卡骆驼:2.2.0-RC1

根据我能找到的所有文档(Akka 文档、用户组等),从基于文件的队列中消费我所要做的就是以这种方式设置我的系统:

主类:

actorSystem = ActorSystem.create("my-system");

Props props = new Props(Supervisor.class);
ActorRef supervisor = actorSystem.actorOf(props, "supervisor");

Camel camel = CamelExtension.get(actorSystem);
CamelContext camelContext = camel.context();
camelContext.start();

主管班:

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.camel.javaapi.UntypedConsumerActor;
import org.apache.camel.Message;

/**
 * Manages creation and supervision of UploadBatchWorkers.
 */
public class Supervisor extends UntypedConsumerActor {    
    @Override
    public String getEndpointUri() {
        return "file:///Users/myhome/queue";
    }

    @Override
    public void preStart() {
        String test = "test";
    }

    @Override
    public void onReceive(Object message) {
        if (message instanceof CamelMessage) {
            // do something
        }
    }

我的问题是,即使我知道在 preStart() 方法的“测试”行上调试期间正在创建主管对象并中断(更不用说,如果我明确地“告诉”它处理得很好),它不会消耗从定义的端点,即使我有另一个应用程序向同一个端点生成消息。

知道我做错了什么吗?

4

2 回答 2

2

好的,问题是我自己的错,如果您查看 UntypedConsumerActor 继承自的 Consumer 特征,则在示例代码中可以清楚地看到问题。

这种方法:

@Override
public void preStart() {
    String test = "test";
}

覆盖其父级的 preStart() 方法,对吗?好吧,该父方法实际上是使用动态创建的端点注册消费者的方法,因此虽然您可以覆盖它,但您必须调用 super() 否则它将不起作用。

希望这对路上的人有用!

于 2013-07-02T21:23:02.530 回答
0

尝试将您的instanceof内部更改onReceive为:

if (message instanceof CamelMessage){
  //do processing here
}

从哪里来CamelMessage的包akka.camel。这就是 akka camel 文档中的示例正在做的事情。

于 2013-07-02T10:38:32.490 回答