1

我需要什么: 1. 通过 jms 发送消息。2. 保留消息,以便如果未从客户端和服务器重新启动处理,客户端会在服务器重新启动时获取消息。3. 从客户端接收消息的顺序应与发送这些消息的顺序相同。4.队列中有多个消费者,以便并行扩展和处理不同的消息。

我做了什么:我使用活动 mq 并具有以下弹簧配置

<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg index="0" value="destination"/>
</bean>

<bean id="template" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="pooledConnectionFactory"/>
    <property name="deliveryMode" value="2"/>
    <property name="defaultDestination" ref="destination"/>
</bean>

<bean id="messageListener" class="InternalNotificationJMSConsumer"/>

<jms:listener-container connection-factory="pooledConnectionFactory" acknowledge="client" destination-type="queue" concurrency="5-10" container-type="simple"  cache="none" client-id="client1" prefetch="1">
      <jms:listener destination="destination" ref="messageListener" method="onMessage"/>
</jms:listener-container>

所以我有我的目的地和我的连接池。然后我创建一个spring jmstemplate并开始发送消息

template.send(new MessageCreator() {
  public Message createMessage(Session session) throws JMSException {
    return session.createObjectMessage(message);
  }
});

然后我的接收器如下:

@Component
public class JMSConsumer implements MessageListener {

  @Override
  public void onMessage(Message message) {
    if (message instanceof ObjectMessage) {
      ....
     }
   }

我的问题:

一个。使用此配置的消息不是按顺序接收的。鉴于 listener-container.concurency 中的 xsd 注释,这是有道理的:“在主题侦听器或消息顺序很重要的情况下,将并发限制为 1;”。所以问题是我如何/是否可以保持订单但有多个消费者。

湾。消息确认。我需要的是,在处理完消息后,onMessage 返回,然后才将消息视为已确认并且不会重新传输。我没有事务管理器,因为我不在任何应用程序服务器中,并且希望尽可能避免这种情况。我只需要自己确认消息。这可能吗?

4

2 回答 2

1

好的,这是我的问题的答案。

<jms:listener-container connection-factory="pooledConnectionFactory" acknowledge="client" destination-type="queue" concurrency="5-10" container-type="simple"  cache="none" prefetch="1">
      <jms:listener destination="destination" ref="messageListener" method="onMessage"/>
</jms:listener-container>

然后在消息监听器实现上: BlockingExecutor executor = new BlockingExecutor(5); //要消费的并发按摩的数量

public void onMessage(Message message) {
....
executor.submitTask(new MessageDispatchCommand((ObjectMessage) message));
....

}

class MessageDispatchCommand implements Runnable {
  private ObjectMessage message;

  public MessageDispatchCommand (final ObjectMessage message) {
    this.message = message;
  }

  public void run() {
    try {
      Serializable msg = message.getObject();
      handle message here
     }....
}

BlockingExecutor 代码可以从这里找到: Java Concurrency in Practice: BoundedExecutor implementation

简单的说。我只有一个 jms 消息侦听器,因为这可以保持消息顺序正确。然后我创建可运行的命令来获取消息并处理它们(这是完成的耗时工作,但它是在不同的线程中完成的)。由于我在所有消息处理线程都已用尽时使用有界执行器,因此 onMessage 阻塞并因此后续消息被简单地持久化,直到有界执行器的线程再次空闲。

于 2013-09-17T14:52:44.680 回答
1

如果您正在并行使用消息,那么由于线程的性质,永远无法保证它们将完全按照它们发送的顺序完成 - 只有它们会按顺序传递给消费者。如果只有某些消息是绝对必须订购的(通常与交易/帐户相关),您可以使用消息组功能将组与消费者相关联 - 一种粘性负载平衡。

自己确认消息绝对是可能的,但可能不需要。有MessageListenerContainer一个acknowledge属性,如果设置为transacted将导致消息仅在MessageListener完成执行后才被确认为已使用,而不会引发异常。不需要事务管理器(尽管PlatformTransactionManager如果您确实想使用 Spring,则不需要任何特定的容器)。

如果您确实想自己确认消息,请设置acknowledgeclient,并message.acknowledge()在您的消息成功完成后调用。如果侦听器在没有调用它的情况下完成,则消息将被发送到 DLQ(默认)或重新传递。

于 2013-09-11T11:02:33.397 回答