1

我目前在使用 jms 同步请求/回复方法时遇到问题,这就是发生的情况:

1.)ProgramA创建一个jms消息,一个临时队列并将其设置为replyTo。

2.) ProgramB 有一个从 ProgramA 创建的消息的侦听器,处理消息并回复它。但是 ProgramB 需要与有时需要超过 10 秒才能回复的第 3 方 Web 服务进行通信,这就是我将消费者设置为侦听 5000(5 秒)的问题,当然之后它会超时。所以没有收到消息。

我的观察: 1.) 即使 ProgramA 已读完(还没有回复,那一刻我尝试删除临时队列)。它不能并且 ProgramB 仍然能够写入回复队列,但没有人会阅读该消息(为时已晚)。

当我尝试将 5s 更改为 20s 收听时间时,问题就解决了,但这是正确的方法吗?

当 ProgramA 停止读取时,ProgramB 是否有可能不尝试写入队列?

部分代码:

Destination replyQueue = send(jmsUtil, actionDTO);
SalesOrderResponseDTO responseDTO = readReply(jmsUtil, replyQueue, actionDTO);

public Destination send(JmsSessionUtil jmsUtil, SalesOrderActionDTO soDTO) {
    try {
        utx.begin();        
        jmsUtil.send(soDTO, null, 0L, 1, Long.parseLong(configBean.getProperty("jms.payrequest.timetolive")), true);
        utx.commit();
        return jmsUtil.getReplyQueue();
    } catch (Exception e) {
        try {
            utx.rollback();
        } catch (Exception e1) {

        }       
    }
    return null;
}

public SalesOrderResponseDTO readReply(JmsSessionUtil jmsUtil, Destination replyQueue, SalesOrderActionDTO actionDTO) {
    SalesOrderResponseDTO responseDTO = null;
    try {       
        utx.begin();

        responseDTO = (SalesOrderResponseDTO) jmsUtil.read(replyQueue);

        if (responseDTO != null) {
            // fires the response event
            SalesOrderResponsePayload eventPayload = new SalesOrderResponsePayload();
            eventPayload.setResponseDTO(responseDTO);
            responseEvent.fire(eventPayload);
        } else { // timeout
            ((TemporaryQueue) replyQueue).delete();
            jmsUtil.dispose();
        }
        utx.commit();
        return responseDTO;
    } catch (Exception e) {
        try {
            utx.rollback();
        } catch (Exception e1) {
        }
    }
    return responseDTO;
}

public String send(MessageDTO messageDTO,
            JMSQueueEnum resultNotificationQueue, Long parentProcessId,
            int JMSPriority, long timeToLive, boolean hasReply)
            throws JMSException, InvalidDTOException, NamingException {

    try {
        // Process optional parameters
        messageDTO.setResultNotificationQueue(resultNotificationQueue);
        messageDTO.setParentProcessId(parentProcessId);

        // Wrap MessageDTO in a JMS ObjectMessage
        ObjectMessage msg = MessageDTOHelper.serialize(session, messageDTO);
        msg.setJMSType(messageDTO.getClass().getSimpleName());
        msg.setStringProperty("DTOType", messageDTO.getClass()
                .getSimpleName());

        requestProducer = session.createProducer(queue);

        if (hasReply) {
            replyQueue = session.createTemporaryQueue();
            replyConsumer = session.createConsumer(replyQueue);     
            msg.setJMSReplyTo(replyQueue);
        }

        if (JMSPriority > -1) {
            requestProducer.send(msg, DeliveryMode.PERSISTENT, JMSPriority,
                    timeToLive);
        } else {
            // Send the JMS message
            requestProducer.send(msg);
        }
        return msg.getJMSMessageID();
    } catch (Exception e) {

    }

    return null;
}

public MessageDTO read(Destination replyQueue) throws JMSException,
            NamingException {
    if (replyQueue instanceof Queue) {
        Message msg = replyConsumer.receive(20000);

        if (msg == null) {
            return null;
        }

        MessageDTO messageDTO = MessageDTOHelper
                .deserialize((ObjectMessage) msg);

        return messageDTO;
    } else {

    }
    return null;
}
4

2 回答 2

0

您可以让 A 在其消息中添加一个带有当前时间戳 + 5 秒的标头。当 B 收到第 3 方的响应时,如果当前时间大于 header,它应该丢弃结果而不发送。您可以为此使用 time-to-live jms 消息属性,尽管这不是它的明确目的。

于 2013-02-15T11:46:18.393 回答
0

这里的实际问题是您需要同步还是异步通信。

我总是更喜欢异步,从你的问题看来,在你的情况下也不需要同步通信。但是,如果出于某种原因进行同步,那么您将陷入临时队列 - 您必须指定超时间隔,并且您将面临问题中表达的问题。如果程序 A 可以等待,请提高超时间隔,尽管这远非最佳。据我所知,程序 B 不可能检查 A 是否还在听。

在异步通信的情况下,您(至少)有两个 JMS 选项:

  1. 使用不同的消息队列 - 程序 A 在 Queue1 上发送消息并完成,但在 Queue2 上侦听(例如,通过消息驱动 Bean),程序 B 在完成后将其响应。小缺点是使用了一对额外的生产者和消费者。
  2. 使用相同的消息队列 - 程序 A 和程序 B 都在 Queue1 上发送和接收消息,但使用不同的消息选择器(参见此处的描述)。基本上,消息选择器将过滤特定侦听器的消息,从而允许使用相同的队列进行双向通信。

也可以看看:

于 2013-02-15T08:41:40.757 回答