我目前在使用 jms 同步请求/回复方法时遇到问题,这就是发生的情况:
1.)ProgramA创建一个jms消息,一个临时队列并将其设置为replyTo。
2.) ProgramB 有一个从 ProgramA 创建的消息的侦听器,处理消息并回复它。但是 ProgramB 需要与有时需要超过 10 秒才能回复的第 3 方 Web 服务进行通信,这就是我将消费者设置为侦听 5000(5 秒)的问题,当然之后它会超时。所以没有收到消息。
我的观察: 1.) 即使 ProgramA 已读完(还没有回复,那一刻我尝试删除临时队列)。它不能并且 ProgramB 仍然能够写入回复队列,但没有人会阅读该消息(为时已晚)。
当我尝试将 5s 更改为 20s 收听时间时,问题就解决了,但这是正确的方法吗?
当 ProgramA 停止读取时,ProgramB 是否有可能不尝试写入队列?
部分代码:
Destination replyQueue = send(jmsUtil, actionDTO);
SalesOrderResponseDTO responseDTO = readReply(jmsUtil, replyQueue, actionDTO);
public Destination send(JmsSessionUtil jmsUtil, SalesOrderActionDTO soDTO) {
try {
utx.begin();
jmsUtil.send(soDTO, null, 0L, 1, Long.parseLong(configBean.getProperty("jms.payrequest.timetolive")), true);
utx.commit();
return jmsUtil.getReplyQueue();
} catch (Exception e) {
try {
utx.rollback();
} catch (Exception e1) {
}
}
return null;
}
public SalesOrderResponseDTO readReply(JmsSessionUtil jmsUtil, Destination replyQueue, SalesOrderActionDTO actionDTO) {
SalesOrderResponseDTO responseDTO = null;
try {
utx.begin();
responseDTO = (SalesOrderResponseDTO) jmsUtil.read(replyQueue);
if (responseDTO != null) {
// fires the response event
SalesOrderResponsePayload eventPayload = new SalesOrderResponsePayload();
eventPayload.setResponseDTO(responseDTO);
responseEvent.fire(eventPayload);
} else { // timeout
((TemporaryQueue) replyQueue).delete();
jmsUtil.dispose();
}
utx.commit();
return responseDTO;
} catch (Exception e) {
try {
utx.rollback();
} catch (Exception e1) {
}
}
return responseDTO;
}
public String send(MessageDTO messageDTO,
JMSQueueEnum resultNotificationQueue, Long parentProcessId,
int JMSPriority, long timeToLive, boolean hasReply)
throws JMSException, InvalidDTOException, NamingException {
try {
// Process optional parameters
messageDTO.setResultNotificationQueue(resultNotificationQueue);
messageDTO.setParentProcessId(parentProcessId);
// Wrap MessageDTO in a JMS ObjectMessage
ObjectMessage msg = MessageDTOHelper.serialize(session, messageDTO);
msg.setJMSType(messageDTO.getClass().getSimpleName());
msg.setStringProperty("DTOType", messageDTO.getClass()
.getSimpleName());
requestProducer = session.createProducer(queue);
if (hasReply) {
replyQueue = session.createTemporaryQueue();
replyConsumer = session.createConsumer(replyQueue);
msg.setJMSReplyTo(replyQueue);
}
if (JMSPriority > -1) {
requestProducer.send(msg, DeliveryMode.PERSISTENT, JMSPriority,
timeToLive);
} else {
// Send the JMS message
requestProducer.send(msg);
}
return msg.getJMSMessageID();
} catch (Exception e) {
}
return null;
}
public MessageDTO read(Destination replyQueue) throws JMSException,
NamingException {
if (replyQueue instanceof Queue) {
Message msg = replyConsumer.receive(20000);
if (msg == null) {
return null;
}
MessageDTO messageDTO = MessageDTOHelper
.deserialize((ObjectMessage) msg);
return messageDTO;
} else {
}
return null;
}