我有一个第三方发布的 JMS 队列。我想在不同的机器上设置多个消费者,只有一台特定机器的消费者,确认该队列上的消息。简而言之,如果特定机器的消费者没有收到消息,则不应从队列中删除该消息。这是可以实现的吗?
2 回答
好的,您可能有此设置的原因,并且很容易实现。
我会使用本地会话事务。根据某些标准提交或回滚事务相当容易,例如哪个服务器正在使用消息。如果回滚,则消息将再次首先出现在队列中。
示例代码可能如下所示:
public class MyConsumer implements MessageListener{
Session sess;
public void init(Connection conn, Destination dest){
// connection and destination from JNDI, or some other method.
sess = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons = sess.createConsumer(dest);
cons.setMessageListener(this);
conn.start();
}
@Override
public void onMessage(Message msg) {
// Do whatever with message
if(isThisTheSpecialServer()){
sess.commit();
}else{
sess.rollback();
}
}
private boolean isThisTheSpecialServer(){
// figure out if this server should delete messages or not
}
}
如果您在带有 JTA 的 Java EE 容器中执行此操作并且正在使用 UserTransactions,则只需调用 UserTransaction.setRollBack(); 或者,如果您使用的是声明性事务,则可以在阅读消息并完成操作后抛出运行时异常以使事务失败并将消息回滚到队列中。请注意,使用这种方法也将回滚数据库更改(如果您使用的是 JTA 而不是本地 JMS 事务)。
更新:
你真的应该使用交易而不是确认来做到这一点。
可以在此处找到该主题的摘要(针对 ActiveMQ,但通常针对 JMS 编写)。 http://activemq.apache.org/should-i-use-transactions.html
我不知道这种行为是否与所有 JMS 实现一致,但对于 ActiveMQ,如果您尝试使用带有 Session.CLIENT_ACKNOWLEDGEMENT 的非事务性会话,那么它的行为将不会像您预期的那样。已读取但未确认的消息仍在队列中,但不会“释放”并传递给其他 JMS 消费者,直到与第一个消费者的连接中断(即 connection.close()、崩溃或相似的)。
使用本地事务,您可以通过 session.commit() 和 session.rollback() 显式控制它。我认为不使用事务没有任何意义。确认只是为了保证交付。
另一种看待这个问题的方法是转发队列。您可以通过执行以下操作将其应用于您的设计:
- 在第三方发布的队列上创建消费者。
- 这个消费者有一项工作——将每条消息分发到其他队列。
- 创建您的真正订阅者将收听的其他队列。
- 对您的消息侦听器进行编码以获取每条消息并将其转发到各个目的地。
- 更改每个侦听器以从其特定队列中读取。
通过这样做,您可以确保每个侦听器都能看到每条消息,每个事务都按预期工作,并且您不对消息的发送方式做出任何假设(例如,如果发布方正在做什么AUTO_ACKNOWLEDGE
?)