3

我在 Linux 操作系统中使用带有 JBoss6 的 HornetQ,谁能告诉我我们如何控制 JMS 队列来删除消息或更改消息的顺序,与队列连接的 MessageConsumers 的详细信息?

哪种方法最适合这种要求?任何建议表示赞赏。

谢谢。

4

1 回答 1

1

队列是一个简单的概念——多个源写入队列,单个消费者按照接收顺序一个接一个地读取消息。试图引入随机访问会混淆这个概念,并且与队列的意图背道而驰。

如果不能修改消费者来删除或排序消息,那么引入一个中间队列和消息驱动 bean (MDB) 来完成这项工作:MDB 将消费队列上的消息Q,丢弃某些消息并重新排序其他消息在将消息发布到队列之前Q'

前:

Q -> orignal consumer

后:

Q -> your filtering and sorting MDB -> Q' -> original consumer

这保留了您设计中组件的意图,并且在我看来更容易解释和理解。

编辑:您的 MDB 可能类似于下面显示的示例(基于 Java Enterprise Edition 6教程)。本教程还包含有关打包和部署 MDB 的信息。

// MDB to consume messages on the original queue
@MessageDriven(mappedName="jms/IncomingQueue", activationConfig =  {
        @ActivationConfigProperty(propertyName = "acknowledgeMode",
                                  propertyValue = "Auto-acknowledge"),
        @ActivationConfigProperty(propertyName = "destinationType",
                                  propertyValue = "javax.jms.Queue")
    })
public class MyMDB implements MessageListener {
    @EJB
    private MessageFilter messageFilter;

    public void onMessage(Message message) {
        // pass on to MessageFilter bean for processing
        messageFilter.filter(message);
    }
}

// singleton bean to filter and sort messages, then re-publish to the original consumer
// if singleton doesn't work in your environment then you might have to persist the
// messages using e.g. JPA
@Singleton
public class MessageFilter {
    @Resource(name="jms/OutgoingQueue")
    Queue outgoingQueue;

    @Resource(name="jms/QueueConnectionFactory")
    QueueConnectionFactory qcf;

    // accept incoming message from the MDB
    void filter(Message message) {
        // filter and sort messages
        ...

        // send to queue read by the original consumer
        send(message);            
    }

    // send message to the filtered & sorted queue for the original consumer
    void send(Message message) {
        QueueConnection queueConnection = qcf.createQueueConnection();
        QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        QueueSender queueSender = queueSession(outgoingQueue);

        queueSender.send(message);
    }
}

Java EE 6 教程还包含有关如何创建单例 bean的示例,这里是连接到队列以发送消息的教程

于 2011-06-07T22:28:22.857 回答