1

我想Activemq在变量x值等于时发送确认1。如果不等于1,我想将消息重新传递到Activemq。然后只Activemq将该消息再次传递给订阅者。为此,我编写了以下程序。

消息消费者.java:

 public class Consumer extends HttpServlet {
 @Override
 protected void service(HttpServletRequest arg0, HttpServletResponse arg1)
    throws ServletException, IOException {
  try {
    ActiveMQConnectionFactory connectionFactory=new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61617");
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setInitialRedeliveryDelay(1000L);
policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES);
connectionFactory.setRedeliveryPolicy(policy);
connectionFactory.setUseRetroactiveConsumer(true);
Connection connection=connectionFactory.createConnection();
final Session session=connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Topic queue=session.createTopic("MessageTesting");
javax.jms.MessageConsumer consumer=session.createConsumer(queue);
//anonymous class
MessageListener listener = new MessageListener() {
        @Override
    public void onMessage(Message msg) {
        TextMessage msg1=(TextMessage)msg;
        try {
                String messageBody=msg1.getText();
                if (x==1) {
                   //Process was completely done,so I am sending acknowledge
                   session.commit();
                }  
                else {
                   //Process is not done sucessfully, So I want to redeliver messages, For this
                   session.rollback();
                 }
           }
          catch (Exception e) {
                 e.printStackTrace();
           }

    }
};
consumer.setMessageListener(listener);
connection.start();
}
 }

这是正确的方法吗?你能建议我吗,有没有有效的方法。

谢谢。

4

1 回答 1

2

理想情况下,您必须:

创建一个ActiveMQConnectionFactory到您的绑定地址,创建一个RedeliveryPolicy并将其设置为ActiveMQConnectionFactory。创建一个会话,如果为真,则使用session.commit() 如果失败,则使用 session.rollback()。

这两个链接可能会对您有所帮助:

http://activemq.apache.org/message-redelivery-and-dlq-handling.html

http://activemq.apache.org/redelivery-policy.html

public class Consumer extends HttpServlet {
@Override
protected void service(HttpServletRequest arg0, HttpServletResponse arg1)
throws ServletException, IOException {
try {
...
MessageListener listener = new MessageListener() {
public void onMessage(Message msg) {
....
}
};
....
}

}

于 2013-09-10T12:55:56.427 回答