24

如果我们使用“临时队列”来使用 JMS 请求/回复机制,那么该代码是否具有可扩展性?

到目前为止,我们不知道我们是否会支持每秒 100 个请求,或者每秒 1000 个请求。

下面的代码是我正在考虑实现的。它以“同步”方式使用 JMS。关键部分是创建“消费者”以指向为此会话创建的“临时队列”的位置。我只是不知道使用这种临时队列是否是一种可扩展的设计。

  destination = session.createQueue("queue:///Q1");
  producer = session.createProducer(destination);
  tempDestination = session.createTemporaryQueue();
  consumer = session.createConsumer(tempDestination);

  long uniqueNumber = System.currentTimeMillis() % 1000;
  TextMessage message = session
      .createTextMessage("SimpleRequestor: Your lucky number today is " + uniqueNumber);

  // Set the JMSReplyTo
  message.setJMSReplyTo(tempDestination);

  // Start the connection
  connection.start();

  // And, send the request
  producer.send(message);
  System.out.println("Sent message:\n" + message);

  // Now, receive the reply
  Message receivedMessage = consumer.receive(15000); // in ms or 15 seconds
  System.out.println("\nReceived message:\n" + receivedMessage);

更新:

我遇到了另一种模式,请参阅此博客 该想法是对发送和接收使用“常规”队列。但是对于“同步”调用,为了获得所需的响应(即匹配请求),您创建一个使用“选择器”侦听接收队列的消费者。

脚步:

    // 1. Create Send and Receive Queue.
    // 2. Create a msg with a specific ID
 final String correlationId = UUID.randomUUID().toString();
 final TextMessage textMessage = session.createTextMessage( msg );
 textMessage.setJMSCorrelationID( correlationId );

    // 3. Start a consumer that receives using a 'Selector'.
           consumer = session.createConsumer( replyQueue, "JMSCorrelationID = '" + correlationId + "'" );

所以这种模式的不同之处在于我们不会为每个新请求创建一个新的临时队列。相反,所有响应都只到达一个队列,但使用“选择器”来确保每个请求线程都接收到唯一关心的响应。

我认为这里的缺点是您必须使用“选择器”。我还不知道这是否比前面提到的模式更不受欢迎或更受欢迎。想法?

4

7 回答 7

6

关于您的帖子中的更新 - 如果在消息头上执行选择器非常有效,就像您使用 Correlation ID 所做的那样。Spring Integration也在内部执行此操作以实现 JMS 出站网关

于 2012-05-29T01:08:41.183 回答
4

有趣的是,它的可扩展性实际上可能与其他响应所描述的相反。

WebSphere MQ 尽可能保存和重用动态队列对象。因此,虽然动态队列的使用不是免费的,但它确实可以很好地扩展,因为当队列被释放时,WMQ 需要做的就是将句柄传递给下一个请求新队列实例的线程。在繁忙的 QMgr 中,动态队列的数量将保持相对静态,而句柄从线程传递到线程。严格来说,它不如重用单个队列那么快,但也不错。

另一方面,尽管索引CORRELID速度很快,但性能与索引中的消息数量成反比。如果队列深度开始建立,它也会有所不同。当应用程序GETWAIT一个空队列上运行时,没有延迟。但是在深度队列中,QMgr 必须搜索现有消息的索引以确定回复消息不在其中。在您的示例中,这就是每秒搜索空索引与搜索大索引 1,000 次之间的区别。

结果是 1000 个动态队列,每个队列包含一条消息,实际上可能比具有 1000 个线程的单个队列更快CORRELID,具体取决于应用程序和负载的特性。我建议在进行特定设计之前对其进行大规模测试。

于 2013-02-09T07:24:58.950 回答
2

在共享队列上的相关 ID 上使用选择器将非常适合多个消费者。

然而,1000 个请求/秒会很多。如果性能出现问题,您可能希望在不同实例之间稍微分配负载。

您可能想详细说明请求与客户数量。如果客户端数量小于 10 并且将保持相当静态,并且请求数量非常高,那么最具弹性和快速的解决方案可能是为每个客户端设置静态回复队列。

于 2012-05-29T10:40:17.877 回答
1

创建临时队列不是免费的。毕竟它是在代理上分配资源。话虽如此,如果您有一个未知的(事先)可能不受限制的客户端数量(多个 JVM、每个 JVM 多个并发线程等),您可能别无选择。每次分配客户端队列并将它们分配给客户端将很快失控。

当然,您所描绘的是最简单的解决方案。如果你能得到交易量的真实数字并且它的规模足够大,那很好。

在我考虑避免临时排队之前,我会更多地考虑限制客户数量并让客户长寿。也就是在客户端创建一个客户端池,让池中的客户端在启动时创建临时队列、会话、连接等,在后续请求中复用,在关闭时拆除。然后调整问题成为池的最大/最小大小之一,修剪池的空闲时间是什么,以及当池最大化时的行为是什么(失败与阻塞)。除非您正在创建任意数量的瞬态 JVM(在这种情况下,您会因为 JVM 启动开销而遇到更大的扩展问题),否则它应该可以扩展。毕竟,此时您分配的资源反映了系统的实际使用情况。

要避免的事情是创建和销毁大量无故的队列、会话、连接等。设计服务器端以允许从一开始就进行流式传输。然后在需要时/在需要时进行池化。就像不是一样,对于任何重要的事情,您都需要这样做。

于 2012-05-28T15:50:23.623 回答
0

使用临时队列每次都会花费创建relyToProducers。与为静态replyToQueue 使用缓存的生产者不同,createProducer 方法成本更高,并且会影响高度并发调用环境中的性能。

于 2014-07-02T04:23:15.137 回答
0

我一直面临同样的问题,并决定自己在无状态 bean 中汇集连接。一个客户端连接有一个 tempQueue,并位于 JMSMessageExchanger 对象(包含 connectionFactory、Queue 和 tempQueue)中,该对象绑定到一个 bean 实例。我已经在 J​​SE/EE 环境中对其进行了测试。但我不太确定 Glassfish JMS 池的行为。它真的会关闭 JMS 连接,在 bean 方法结束后“手动”获得吗?我做错了什么吗?

我还关闭了客户端 bean (TransactionAttributeType.NOT_SUPPORTED) 中的事务以立即将请求消息发送到请求队列。

package net.sf.selibs.utils.amq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import lombok.Getter;
import lombok.Setter;
import net.sf.selibs.utils.misc.UHelper;

public class JMSMessageExchanger {

    @Setter
    @Getter
    protected long timeout = 60 * 1000;

    public JMSMessageExchanger(ConnectionFactory cf) {
        this.cf = cf;
    }

    public JMSMessageExchanger(ConnectionFactory cf, Queue queue) {
        this.cf = cf;
        this.queue = queue;
    }
    //work
    protected ConnectionFactory cf;
    protected Queue queue;
    protected TemporaryQueue tempQueue;
    protected Connection connection;
    protected Session session;
    protected MessageProducer producer;
    protected MessageConsumer consumer;
    //status
    protected boolean started = false;
    protected int mid = 0;

    public Message makeRequest(RequestProducer producer) throws Exception {
        try {
            if (!this.started) {
                this.init();
                this.tempQueue = this.session.createTemporaryQueue();
                this.consumer = this.session.createConsumer(tempQueue);
            }
            //send request
            Message requestM = producer.produce(this.session);
            mid++;
            requestM.setJMSCorrelationID(String.valueOf(mid));
            requestM.setJMSReplyTo(this.tempQueue);
            this.producer.send(this.queue, requestM);
            //get response
            while (true) {
                Message responseM = this.consumer.receive(this.timeout);
                if (responseM == null) {
                    return null;
                }
                int midResp = Integer.parseInt(responseM.getJMSCorrelationID());
                if (mid == midResp) {
                    return responseM;
                } else {
                    //just get other message
                }
            }

        } catch (Exception ex) {
            this.close();
            throw ex;
        }
    }

    public void makeResponse(ResponseProducer producer) throws Exception {
        try {
            if (!this.started) {
                this.init();
            }
            Message response = producer.produce(this.session);
            response.setJMSCorrelationID(producer.getRequest().getJMSCorrelationID());
            this.producer.send(producer.getRequest().getJMSReplyTo(), response);

        } catch (Exception ex) {
            this.close();
            throw ex;
        }
    }

    protected void init() throws Exception {
        this.connection = cf.createConnection();
        this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        this.producer = this.session.createProducer(null);
        this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        this.connection.start();
        this.started = true;
    }

    public void close() {
        UHelper.close(producer);
        UHelper.close(consumer);
        UHelper.close(session);
        UHelper.close(connection);
        this.started = false;
    }

}

在客户端(无状态 bean)和服务器(@MessageDriven)中使用相同的类。RequestProducer 和 ResponseProducer 是接口:

package net.sf.selibs.utils.amq;

import javax.jms.Message;
import javax.jms.Session;

public interface RequestProducer {
    Message produce(Session session) throws Exception;
}
package net.sf.selibs.utils.amq;

import javax.jms.Message;

public interface  ResponseProducer extends RequestProducer{
    void setRequest(Message request);
    Message getRequest();
}

我还阅读了关于通过 AMQ 实现请求响应的 AMQ 文章:http: //activemq.apache.org/how-should-i-implement-request-response-with-jms.html

于 2015-10-02T06:43:36.713 回答
-1

也许我来得太晚了,但我这周花了几个小时来让同步请求/回复在 JMS 中工作。用超时扩展 QueueRequester 怎么样。我做到了,至少在一台机器(运行代理、请求者和回复者)上进行了测试,结果表明该解决方案优于所讨论的解决方案。另一方面,它取决于使用 QueueConnection,这意味着您可能被迫打开多个连接。

于 2013-02-08T16:11:43.197 回答