3

我有消息生产者使用ActiveMQ发送有关某些事件的 JMS 消息。但是,与 ActiveMQ 的连接可能不会一直处于打开状态。因此,事件被存储,当连接建立时,它们应该被读取和发送。这是我的代码:

private void sendAndSave(MyEvent event) {
    boolean sent = sendMessage(event);
    event.setProcessed(sent);
    boolean saved = repository.saveEvent(event);
    if (!sent && !saved) {
        logger.error("Change event lost for Id = {}", event.getId());
    }
}

private boolean sendMessage(MyEvent event) {
    try {
        messenger.publishEvent(event);
        return true;
    } catch (JmsException ex) {
        return false;
    }
}

我想创建某种ApplicationEventListener,在建立连接并处理未发送的事件时将调用它。我浏览了 JMS、Spring 框架和 ActiveMQ 文档,但找不到任何线索如何将我的侦听器与 ConnectionFactory 连接起来。

如果有人可以帮助我,我将不胜感激。

这是我的应用程序 Spring 上下文对 JMS 的说明:

<!-- Connection factory to the ActiveMQ broker instance.              -->
<!-- The URI and credentials must match the values in activemq.xml    -->
<!-- These credentials are shared by ALL producers.                   -->
<bean id="jmsTransportListener" class="com.rhd.ams.service.common.JmsTransportListener" 
      init-method="init" destroy-method="cleanup"/>
<bean id="amqJmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="${jms.publisher.broker.url}"/>
    <property name="userName" value="${jms.publisher.username}"/>
    <property name="password" value="${jms.publisher.password}"/>
    <property name="transportListener" ref="jmsTransportListener"/>
</bean>

<!-- JmsTemplate, by default, will create a new connection, session, producer for         -->
<!-- each message sent, then close them all down again. This is very inefficient!         -->
<!-- PooledConnectionFactory will pool the JMS resources. It can't be used with consumers.-->
<bean id="pooledAmqJmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
    <property name="connectionFactory" ref="amqJmsConnectionFactory" />
</bean>

<!-- Although JmsTemplate instance is unique for each message, it is  -->
<!-- thread-safe and therefore can be injected into referenced obj's. -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <constructor-arg ref="pooledAmqJmsConnectionFactory"/>
</bean>
4

1 回答 1

1

你描述这个问题的方式,听起来确实像是 JMS 持久订阅的一个打开和关闭的案例。在走这条路之前,您可能需要考虑更传统的实现。除了注意事项之外,ActiveMQ 提供了您可以收听的咨询消息,并将针对包括新连接在内的各种事件发送这些消息。

=========

射击,对不起......我不明白问题是什么。我认为咨询根本不是解决方案……毕竟,您需要连接到经纪人才能获得它们,但连接是您所知道的。

因此,如果我理解正确(准备重试#2 ....),您需要的是一个客户端连接,当它失败时,它会尝试无限期地重新连接。当它重新连接时,您希望触发一个(或更多)事件,将挂起的消息刷新到代理。

因此检测丢失的连接很容易。您只需注册一个 JMS ExceptionListener。至于检测重新连接,我能想到的最简单的方法是启动重新连接线程。当它连接时,停止重新连接线程并使用 Observer/Observable 或 JMX 通知等通知相关方。即使您只有一个代理,您也可以使用ActiveMQ 故障转移传输,它会为您执行连接重试循环。至少,它应该是,但它并没有为你做很多事情,你自己的重新连接线程无法完成......如果你愿意将一些控制权委托给它,它会缓存你未刷新的消息(请参阅trackMessages _选项),然后在重新连接时发送它们,这就是你想要做的所有事情。

我想如果你的代理宕机了几分钟,这不是一个坏方法,但如果你说的是几个小时,或者你可能会在宕机时间内累积 10k+ 条消息,我只是不知道缓存机制是否如可靠,因为你需要它。

===================

移动应用程序...对。不太适合故障转移传输。然后我会实现一个定期连接的计时器(使用http传输可能是个好主意,但不相关)。当它确实连接时,如果没有什么要冲洗的,那么在 x 分钟内见。如果有,请发送每条消息,等待握手并从您的移动商店中清除该消息。然后在 x 分钟后再见。

我假设这是Android?如果没有,请停止阅读此处。我们实际上在前段时间实现了这一点。我只做了服务器端,但如果我没记错的话,连接计时器/轮询器每 n 分钟旋转一次(我认为是可变频率,因为过于激进会耗尽电池)。一旦建立了成功的连接,我相信他们使用意图广播来推动消息推送者做他们的事情。当时的想法是,即使只有一个消息推送器,我们也可以添加更多。

于 2013-04-25T14:41:47.433 回答