我正在回答自己,因为我找到了解决方案。
解决方案是在进程中使用嵌入式 ActiveMQ 代理。我总是向这个嵌入式代理发布消息,并且出于显而易见的原因,我不必测试它的可用性或处理重新连接 - 它就在那里,在我的过程中。
ActiveMQ 本身有一种连接代理的方法,这样一个消息总是被传递到下一个,并且 ActiveMQ 自动处理所有连接的东西。如果代理在应用程序启动时关闭,则初始连接,或者如果代理在飞行中关闭,则重新连接。
您也可以自由选择嵌入式代理的存储。如果您将消息持久化到磁盘,如果您的应用程序在消息在嵌入式代理中排队之后但在 ActiveMQ 设法传递到中央代理之前出现故障,ActiveMq 还将负责将消息发布到中央代理。
这是 grails-app/conf/spring 中用于设置它的 bean 代码。我有点春天菜鸟,所以我相信有更好的方法。使用 MethodInvokingFactoryBean 可以很容易地不用 Spring 本身进行所有花哨的初始化,这帮助我在路上完成了这个节目。
activemqLocalMessageDeliveryConnection(org.springframework.beans.factory.config.MethodInvokingFactoryBean) {
targetClass = "com.myapp.ActiveMQLocalBrokerHelper"
targetMethod = "createConnection"
arguments = [ref("grailsApplication")]
}
activemqLocalMessageDeliveryProducerSession(org.springframework.beans.factory.config.MethodInvokingFactoryBean) {
targetClass = "com.myapp.ActiveMQLocalBrokerHelper"
targetMethod = "createProducerSession"
arguments = [ref("activemqLocalMessageDeliveryConnection")]
}
activemqLocalMessageDeliveryProducer(org.springframework.beans.factory.config.MethodInvokingFactoryBean) { bean ->
targetClass = "com.myapp.ActiveMQLocalBrokerHelper"
targetMethod = "createProducer"
arguments = [ref("grailsApplication"), ref("activemqLocalMessageDeliveryProducerSession")]
}
这是 ActiveMQLocalBrokerHelper 的代码。
package com.myapp
import java.io.File
import org.apache.activemq.command.ActiveMQDestination
class ActiveMQLocalBrokerHelper {
static javax.jms.Connection createConnection(grailsApplication) {
// http://activemq.apache.org/how-do-i-embed-a-broker-inside-a-connection.html
def localBrokerName = "localMessageDelivery"
def broker = new org.apache.activemq.broker.BrokerService()
broker.setPersistent(false)
broker.setBrokerName(localBrokerName)
broker.start()
def connFactory = new org.apache.activemq.ActiveMQConnectionFactory(broker.getVmConnectorURI());
def conn = connFactory.createConnection();
conn.start()
return conn
}
static javax.jms.Session createProducerSession(conn) {
def session = conn.createSession(true, javax.jms.Session.AUTO_ACKNOWLEDGE);
return session
}
static javax.jms.MessageProducer createProducer(grailsApplication, session) {
def destination = session.createQueue("MessageDelivery")
def producer = session.createProducer(destination)
producer.setDeliveryMode(javax.jms.DeliveryMode.NON_PERSISTENT)
return producer
}
}
这就是获得嵌入式 ActiveMQ 代理所需的全部内容。这个特定的不是持久的,而是事务性的。当然,您可以自由地使用 ActiveMQ Java API 来做任何事情。要排队和使用消息,您只需使用普通的 ActiveMQ API。例子:
def activemqLocalMessageDeliveryProducer // Inject the producer
def activemqLocalMessageDeliveryProducerSession // And the session
void enqueue(String messageText) {
def message = session.createTextMessage(messageText)
producer.send(message)
session.commit()
}
消费:
def activemqLocalMessageDeliveryConnection // Inject the connection
void consume() {
def activemqSession = activemqLocalMessageDeliveryConnection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
def destination = activemqSession.createQueue("MyQueueName")
def consumer = activemqSession.createConsumer(destination)
while (true) {
def message = consumer.receive()
// Do something with message
}
}