0

我正在寻找插件方法(jms 和/或骆驼路由插件)的替代方法来使用 Grails 中的 ActiveMQ。到目前为止一切顺利,但我无法找到管理连接的任何好的解决方案。

这是我的配置/spring/resources.groovy:

import org.springframework.jms.connection.SingleConnectionFactory
import org.apache.activemq.ActiveMQConnectionFactory
import org.springframework.jms.listener.adapter.MessageListenerAdapter
import org.springframework.jms.listener.DefaultMessageListenerContainer

beans = {
    jmsConnectionFactory(SingleConnectionFactory) {
        targetConnectionFactory = { ActiveMQConnectionFactory cf ->
            brokerURL = "tcp://localhost:61616"
        }
    }

    jmsMessageListener(MessageListenerAdapter, ref("myService")) {
        defaultListenerMethod = "onIncomingMessage"
    }

    jmsContainer(DefaultMessageListenerContainer) {
        connectionFactory = jmsConnectionFactory
        destinationName = "StatusSavedTopic"
        messageListener = jmsMessageListener
        autoStartup = true

        // Tells the magic sauce to be an ActiveMQ topic
        pubSubDomain = true
    }
}

如果我设置autoStartup为 true,它可以正常工作,run-app直到我保存我的服务,导致重新编译。当这种情况发生时,连接被断开(通过检查 ActiveMQ Web 控制台确认),并且不再接收消息(显然)。

jmsContainer除了不手动执行此操作并使用 jms 或路由插件之外,是否有其他方法可以确保我保持活力?

4

1 回答 1

2

我正在回答自己,因为我找到了解决方案。

解决方案是在进程中使用嵌入式 ActiveMQ 代理。我总是向这个嵌入式代理发布消息,并且出于显而易见的原因,我不必测试它的可用性或处理重新连接 - 它就在那里,在我的过程中。

ActiveMQ 本身有一种连接代理的方法,这样一个消息总是被传递到下一个,并且 ActiveMQ 自动处理所有连接的东西。如果代理在应用程序启动时关闭,则初始连接,或者如果代理在飞行中关闭,则重新连接。

您也可以自由选择嵌入式代理的存储。如果您将消息持久化到磁盘,如果您的应用程序在消息在嵌入式代理中排队之后但在 ActiveMQ 设法传递到中央代理之前出现故障,ActiveMq 还将负责将消息发布到中央代理。

这是 grails-app/conf/spring 中用于设置它的 bean 代码。我有点春天菜鸟,所以我相信有更好的方法。使用 MethodInvokingFactoryBean 可以很容易地不用 Spring 本身进行所有花哨的初始化,这帮助我在路上完成了这个节目。

activemqLocalMessageDeliveryConnection(org.springframework.beans.factory.config.MethodInvokingFactoryBean) {
    targetClass = "com.myapp.ActiveMQLocalBrokerHelper"
    targetMethod = "createConnection"
    arguments = [ref("grailsApplication")]
}

activemqLocalMessageDeliveryProducerSession(org.springframework.beans.factory.config.MethodInvokingFactoryBean) {
    targetClass = "com.myapp.ActiveMQLocalBrokerHelper"
    targetMethod = "createProducerSession"
    arguments = [ref("activemqLocalMessageDeliveryConnection")]
}

activemqLocalMessageDeliveryProducer(org.springframework.beans.factory.config.MethodInvokingFactoryBean) { bean ->
    targetClass = "com.myapp.ActiveMQLocalBrokerHelper"
    targetMethod = "createProducer"
    arguments = [ref("grailsApplication"), ref("activemqLocalMessageDeliveryProducerSession")]
}

这是 ActiveMQLocalBrokerHelper 的代码。

package com.myapp
import java.io.File
import org.apache.activemq.command.ActiveMQDestination

class ActiveMQLocalBrokerHelper {
    static javax.jms.Connection createConnection(grailsApplication) {
        // http://activemq.apache.org/how-do-i-embed-a-broker-inside-a-connection.html
        def localBrokerName = "localMessageDelivery"

        def broker = new org.apache.activemq.broker.BrokerService()
        broker.setPersistent(false)
        broker.setBrokerName(localBrokerName)
        broker.start()

        def connFactory = new org.apache.activemq.ActiveMQConnectionFactory(broker.getVmConnectorURI());
        def conn = connFactory.createConnection();
        conn.start()

        return conn
    }

    static javax.jms.Session createProducerSession(conn) {
        def session = conn.createSession(true, javax.jms.Session.AUTO_ACKNOWLEDGE);
        return session
    }

    static javax.jms.MessageProducer createProducer(grailsApplication, session) {
        def destination  = session.createQueue("MessageDelivery")
        def producer = session.createProducer(destination)
        producer.setDeliveryMode(javax.jms.DeliveryMode.NON_PERSISTENT)
        return producer
    }
}

这就是获得嵌入式 ActiveMQ 代理所需的全部内容。这个特定的不是持久的,而是事务性的。当然,您可以自由地使用 ActiveMQ Java API 来做任何事情。要排队和使用消息,您只需使用普通的 ActiveMQ API。例子:

def activemqLocalMessageDeliveryProducer // Inject the producer
def activemqLocalMessageDeliveryProducerSession // And the session

void enqueue(String messageText) {
    def message = session.createTextMessage(messageText)
    producer.send(message)
    session.commit()
}

消费:

def activemqLocalMessageDeliveryConnection // Inject the connection

void consume() {
    def activemqSession = activemqLocalMessageDeliveryConnection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
    def destination = activemqSession.createQueue("MyQueueName")
    def consumer = activemqSession.createConsumer(destination)
    while (true) {
        def message = consumer.receive()
        // Do something with message
    }
}
于 2013-07-30T12:55:55.103 回答