我正在尝试使用 JMS 和 JBoss 4.2.2 的一些场景,但我遇到的问题很少。
我有一个队列
<mbean code="org.jboss.mq.server.jmx.Queue" name="jboss.mq.destination:service=Queue,name=notificationQueue">
<attribute name="JNDIName">jms.queue.testQueue</attribute>
<depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends>
<depends optional-attribute-name="SecurityManager">jboss.mq:service=SecurityManager</depends>
<attribute name="SecurityConf">
<security>
<role name="testUser" read="true" write="true" />
</security>
</attribute>
</mbean>
和
<invoker-proxy-binding>
<name>message-driven-bean</name>
<invoker-mbean>default</invoker-mbean>
<proxy-factory>org.jboss.ejb.plugins.jms.JMSContainerInvoker</proxy-factory>
<proxy-factory-config>
<JMSProviderAdapterJNDI>DefaultJMSProvider</JMSProviderAdapterJNDI>
<ServerSessionPoolFactoryJNDI>StdJMSPool</ServerSessionPoolFactoryJNDI>
<CreateJBossMQDestination>true</CreateJBossMQDestination>
<MinimumSize>1</MinimumSize>
<MaximumSize>15</MaximumSize>
<MaxMessages>16</MaxMessages>
<MDBConfig>
<ReconnectIntervalSec>10</ReconnectIntervalSec>
<DLQConfig>
<DestinationQueue>queue/DLQ</DestinationQueue>
<MaxTimesRedelivered>3</MaxTimesRedelivered>
<TimeToLive>0</TimeToLive>
<DLQUser>jbossmquser</DLQUser>
<DLQPassword>letmein</DLQPassword>
</DLQConfig>
</MDBConfig>
</proxy-factory-config>
</invoker-proxy-binding>
为了测试重新发送,我写了 MessageListener
import java.util.*;
import javax.jms.*;
import javax.naming.*;
public class NotifyQueueMessageListener {
public static void main(String[] args) throws NamingException, JMSException {
Hashtable<String, String> contextProperties = new Hashtable<String, String>();
contextProperties.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
contextProperties.put(Context.PROVIDER_URL, "jnp://localhost:7099");
InitialContext initContext = new InitialContext(contextProperties);
Queue queue = (Queue) initContext.lookup("jms.queue.testQueue");
QueueConnection queueConnection = null;
try {
QueueConnectionFactory connFactory = (QueueConnectionFactory) initContext.lookup("ConnectionFactory");
queueConnection = connFactory.createQueueConnection("jbossmquser", "letmein");
Session queueSession = queueConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
queueConnection.setExceptionListener(new MyExceptionListener());
MessageConsumer consumer = queueSession.createConsumer(queue);
MyMessageListener messageListener = new MyMessageListener();
consumer.setMessageListener(messageListener);
queueConnection.start();
Object o = new Object();
synchronized (o) {
o.wait();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
System.out.println("closing connection");
if (queueConnection != null) {
queueConnection.close();
}
}
}
static class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
if (message instanceof ObjectMessage) {
ObjectMessage om = (ObjectMessage) message;
try {
System.out.printf("MyMessageListener.onMessage( %s ), %s\n\n", om, om.getObject());
boolean throwException = om.getBooleanProperty("throw");
if (throwException) {
System.out.println("throwing exception");
throw new NullPointerException("just for testing");
}
message.acknowledge();
} catch (JMSException jmse) {
jmse.printStackTrace();
}
}
}
}
static class MyExceptionListener implements ExceptionListener {
@Override
public void onException(JMSException jmse) {
jmse.printStackTrace();
}
}
}
和消息发送者
import java.text.*;
import java.util.*;
import javax.jms.*;
import javax.naming.*;
public class MessageSender {
public static void main(String[] args) throws NamingException, JMSException {
Hashtable<String, String> contextProperties = new Hashtable<String, String>();
contextProperties.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
contextProperties.put(Context.PROVIDER_URL, "jnp://localhost:7099");
InitialContext initContext = new InitialContext(contextProperties);
Queue queue = (Queue) initContext.lookup("notificationQueue");
QueueConnection queueConnection = null;
try {
QueueConnectionFactory connFactory = (QueueConnectionFactory) initContext.lookup("ConnectionFactory");
queueConnection = connFactory.createQueueConnection("jbossmquser", "letmein");
// QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
// QueueSession queueSession = queueConnection.createQueueSession(true, Session.SESSION_TRANSACTED);
QueueSession queueSession = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
// QueueSession queueSession = queueConnection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
QueueSender sender = queueSession.createSender(queue);
ObjectMessage message = queueSession.createObjectMessage();
message.setBooleanProperty("throw", true); // to throw exception in listener
message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
message.setIntProperty("JMS_JBOSS_REDELIVERY_LIMIT", 3);
sender.send(message);
} finally {
System.out.println("closing connection");
if (queueConnection != null) {
queueConnection.close();
}
}
}
}
预期行为
因为我正在抛出异常,onMessage()
所以我希望该消息会再次尝试多次(<MaxTimesRedelivered>3</MaxTimesRedelivered>
),然后它将被移至 DLQ,但事实并非如此。
我试过的
我尝试了所有确认模式(AUTO、CLIENT、DUPS_OK)以及提交、确认但没有任何效果,甚至没有再次发送消息。
我不知道出了什么问题。JBoss 日志中没有任何相关内容。
当我尝试停止并再次运行 MessageListener 时,我得到:
MyMessageListener.onMessage( org.jboss.mq.SpyObjectMessage {
Header {
jmsDestination : QUEUE.notificationQueue
jmsDeliveryMode : 2
jmsExpiration : 0
jmsPriority : 4
jmsMessageID : ID:13-13577584629501
jmsTimeStamp : 1357758462950
jmsCorrelationID: 20130109200742
jmsReplyTo : null
jmsType : null
jmsRedelivered : true
jmsProperties : {JMSXDeliveryCount=7, throw=true, JMS_JBOSS_REDELIVERY_LIMIT=3, JMS_JBOSS_REDELIVERY_COUNT=6}
jmsPropReadWrite: false
msgReadOnly : true
producerClientId: ID:13
}
} ), my message (2013-01-09 20:07:42)
MyMessageListener.onMessage( org.jboss.mq.SpyObjectMessage {
Header {
jmsDestination : QUEUE.notificationQueue
jmsDeliveryMode : 2
jmsExpiration : 0
jmsPriority : 4
jmsMessageID : ID:15-13577584942741
jmsTimeStamp : 1357758494274
jmsCorrelationID: 20130109200814
jmsReplyTo : null
jmsType : null
jmsRedelivered : true
jmsProperties : {JMSXDeliveryCount=6, throw=true, JMS_JBOSS_REDELIVERY_LIMIT=3, JMS_JBOSS_REDELIVERY_COUNT=5}
jmsPropReadWrite: false
msgReadOnly : true
producerClientId: ID:15
}
} ), my message (2013-01-09 20:08:14)
MyMessageListener.onMessage( org.jboss.mq.SpyObjectMessage {
Header {
jmsDestination : QUEUE.notificationQueue
jmsDeliveryMode : 2
jmsExpiration : 0
jmsPriority : 4
jmsMessageID : ID:20-13577586971991
jmsTimeStamp : 1357758697199
jmsCorrelationID: 20130109201137
jmsReplyTo : null
jmsType : null
jmsRedelivered : true
jmsProperties : {JMSXDeliveryCount=2, throw=true, JMS_JBOSS_REDELIVERY_LIMIT=3, JMS_JBOSS_REDELIVERY_COUNT=1}
jmsPropReadWrite: false
msgReadOnly : true
producerClientId: ID:20
}
} ), my message (2013-01-09 20:11:37)
MyMessageListener.onMessage( org.jboss.mq.SpyObjectMessage {
Header {
jmsDestination : QUEUE.notificationQueue
jmsDeliveryMode : 2
jmsExpiration : 0
jmsPriority : 4
jmsMessageID : ID:21-13577587683201
jmsTimeStamp : 1357758768320
jmsCorrelationID: 20130109201248
jmsReplyTo : null
jmsType : null
jmsRedelivered : true
jmsProperties : {JMSXDeliveryCount=2, throw=true, JMS_JBOSS_REDELIVERY_LIMIT=3, JMS_JBOSS_REDELIVERY_COUNT=1}
jmsPropReadWrite: false
msgReadOnly : true
producerClientId: ID:21
}
} ), my message (2013-01-09 20:12:48)
如您所见,我也尝试了 JMS_JBOSS_REDELIVERY_LIMIT。
任何的想法?