2

我的实验应用程序非常简单,尝试使用 Actors 和 Akka 可以做什么。

在 JVM 启动后,它创建了带有几个普通 Actor 的 Actor 系统,JMS 消费者(akka.camel.Consumer)和 JMS 生产者(akka.camel.Producer)。它在参与者和 JMS 生产者 -> JMS 服务器 -> JMS 消费者之间发送一些消息。它基本上通过 JMS 服务与自己对话。

我不时遇到奇怪的行为:似乎不时地,应该发送到 JMS 服务器的第一条消息不知何故丢失了。通过查看我的应用程序日志,我可以看到应用程序正在尝试发送消息,但 JMS 服务器从未收到它。(对于每次运行,我都必须再次启动 JVM&Application)。

Akka Camel 文档提到,某些组件可能在开始时未完全初始化:“某些 Camel 组件可能需要一段时间才能启动,在某些情况下,您可能想知道端点何时被激活并准备好使用。 "

我试图实现以下等待骆驼初始化

val system = ActorSystem("actor-system")
val camel = CamelExtension(system)

val jmsConsumer = system.actorOf(Props[JMSConsumer])
val activationFuture = camel.activationFutureFor(jmsConsumer)(timeout = 10 seconds, executor = system.dispatcher)
val result = Await.result(activationFuture,10 seconds)

这似乎有助于解决这个问题。(虽然,现在删除此步骤时,我无法再重新创建此问题...:/)。

我的问题是这是否是确保所有组件完全初始化的正确方法?

我应该使用

val future = camel.activationFutureFor(actor)(timeout = 10 seconds, executor = system.dispatcher)
Await.result(future, 10 seconds)

为每个 akka.camel.Producer 和 akka.camel.Consumer 演员确保一切都正确初始化?

这是我应该做的,还是应该做其他事情?文档并不干净,而且由于问题只是偶尔发生,因此不容易测试......

4

1 回答 1

1

在发送任何消息之前,您需要初始化骆驼 JMS 组件和生产者。

import static java.util.concurrent.TimeUnit.SECONDS;

import scala.concurrent.Future;

import scala.concurrent.duration.Duration;

import akka.dispatch.OnComplete;

ActorRef producer = system.actorOf(new Props(SimpleProducer.class), "simpleproducer"); 
Timeout timeout = new Timeout(Duration.create(15, SECONDS));

Future<ActorRef> activationFuture = camel.activationFutureFor(producer,timeout,  system.dispatcher());

activationFuture.onComplete(new OnComplete<ActorRef>() {
            @Override
            public void onComplete(Throwable arg0, ActorRef arg1)
                    throws Throwable {

                producer.tell("First!!");
            }
            },system.dispatcher()); 
于 2013-02-27T18:53:28.270 回答