2

我对骆驼很陌生,一直在努力了解如何在特定场景中使用骆驼。在这种情况下,有一个(基于 Java 的)代理会不时生成操作。我需要一个事件驱动的消费者来获得这些事件的通知。这些事件将被路由到“文件”生产者(暂时)。

在骆驼书中,该示例适用于轮询消费者。我找不到事件驱动消费者的通用解决方案。我遇到了 JMX 的类似实现:

public class JMXConsumer extends DefaultConsumer implements NotificationListener { 

JMXEndpoint jmxEndpoint;    
public JMXConsumer(JMXEndpoint endpoint, Processor processor) { 
    super(endpoint, processor);
    this.jmxEndpoint = endpoint;
}

public void handleNotification(Notification notification, Object handback) { 
    try {
        getProcessor().process(jmxEndpoint.createExchange(notification)); 
    } catch (Throwable e) {
        handleException(e); 
    }
}

}

在这里,只要 JMX 通知到达,就会调用 handleNotification。

我相信我必须做类似的事情来让我的消费者在代理生成动作时得到通知。但是,上面的 handleNotification 方法是特定于 JMX 的。网页上说:“在实现自己的事件驱动消费者时,您必须确定一个类似的事件侦听器方法以在您的自定义消费者中实现。”

我想知道:如何识别类似的事件侦听器,以便在我的代理有操作时通知我的消费者。

非常感谢任何指向网页的建议/链接。

4

2 回答 2

5

我知道这是一个老问题,但我一直在努力解决这个问题,只是想我会记录我的发现,以供其他任何人寻找答案。

创建 Endpoint 类(扩展 DefaultEndpoint)时,您将覆盖以下用于创建使用者的方法:

public Consumer createConsumer(Processor processor)

然后,在您的消费者中,您可以访问处理器 - 在此处理器上调用“进程”将创建一个事件并触发路由。

例如,假设您有一些用于侦听消息的 Java API,并具有某种侦听器。在我的例子中,Listener 将传入的消息放到 LinkedBlockingQueue 中,我的 Consumer 'doStart' 方法看起来像这样(添加你自己的错误处理):

@Override
protected void doStart() throws Exception {
    super.doStart();

    // Spawn a new thread that submits exchanges to the Processor
    Runnable runnable = new Runnable() {
        @Override
        public void run() {
            while(true) {
                IMessage incomingMessage = myLinkedBlockingQueue.take();
                Exchange exchange = getEndpoint().createExchange();
                exchange.getIn().setBody(incomingMessage);
                myProcessor.process(exchange);  
            }
        }
    };
    new Thread(runnable).start();
}

现在我可以将创建创建此 Consumer 的 Endpoint 的组件放入我的 CamelContext 中,并像这样使用它:

from("mycomponent:incoming").to("log:messages");

并且每次来自 Java API 的新消息到达时都会触发日志消息。

希望对某人有所帮助!

于 2015-08-19T08:59:36.343 回答
4

事件驱动就是骆驼。

任何路由实际上都是一个事件监听器。

给定路线:

from("activemq:SomeQueue").
  bean(MyClass.class);

public class MyBean{
  public void handleEvent(MyEventObject eventPayload){ // Given MyEventObject was sent to this "SomeQueue".
     // whatever processing.
  }
}

这将建立一个事件驱动的消费者。那么如何发送事件呢?如果您的应用程序中嵌入了骆驼并从您的事件操作生成器访问 CamelContext,那么您可以从中获取一个生产者模板,然后将您的事件触发到您在骆驼中定义的任何端点,例如“seda:SomeQueue”。

否则,如果您的 Camel 实例在您的应用程序之外的另一个服务器或实例中运行,那么您应该使用其他传输而不是 SEDA。最好是 JMS,但其他人也会这样做,挑选。ActiveMQ 是我的最爱。您可以轻松启动嵌入式 activemq 实例(JVM 内部)并将其连接到 camel,方法是:

camelContext.addComponent("activemq", activeMQComponent("vm://localhost"));
于 2012-08-14T09:33:20.163 回答