2

我正在启动一个需要高度可扩展的 Java EE 项目。到目前为止,这个概念是:

  • 几个 Message Driven Bean,负责架构的不同部分
  • 每个 MDB 都注入一个 Session Bean,处理业务逻辑
  • 几个实体 Bean,提供对持久层的访问
  • 通过 JMS 消息通过请求/回复概念在架构的不同部分之间进行通信:
    • MDB 接收到包含活动请求的 msg
    • 使用其会话 bean 执行必要的业务逻辑
    • 将 msg 中的响应对象返回给原始请求者

这个想法是,通过消息总线将架构的各个部分相互分离,对可扩展性没有限制。只需启动更多组件——只要它们连接到同一条总线,我们就可以不断增长。

不幸的是,我们在请求-回复概念上遇到了很多问题。Transaction Mgmt 似乎阻碍了我们的工作。似乎会话 bean 不应该使用消息?!

阅读http://blogs.oracle.com/fkieviet/entry/request_reply_from_an_ejbhttp://forums.sun.com/message.jspa?messageID=10338789,我觉得人们实际上反对请求/回复概念EJB。

如果是这种情况,您如何EJB 之间进行通信?(请记住,可扩展性是我所追求的)

我当前设置的详细信息:

  • MDB 1 'TestController',使用(本地)SLSB 1 'TestService' 进行业务逻辑
  • TestController.onMessage() 使 TestService 向队列 XYZ 发送消息并请求回复
    • TestService 使用 Bean 托管事务
    • TestService 在初始化时通过联合连接工厂建立到 JMS 代理的连接和会话(@PostConstruct)
    • TestService 在发送后提交事务,然后开始另一个事务并等待 10 秒的响应
  • 消息到达 MDB 2 'LocationController',它使用(本地)SLSB 2 'LocationService' 进行业务逻辑
  • LocationController.onMessage() 使 LocationService 将消息发送请求的 JMSReplyTo 队列
    • 相同的 BMT 概念,相同的 @PostConstruct 概念
  • 都使用相同的连接工厂来访问代理

问题:第一条消息被发送(通过 SLSB 1)和接收(通过 MDB 2)正常。返回消息的发送(通过 SLSB 2)也很好。然而,SLSB 1 从不接收任何东西——它只是超时。

我试过没有messageSelector,没有变化,仍然没有收到消息。

通过会话 bean 消费消息不好吗?

SLSB 1 - TestService.java

@Resource(name = "jms/mvs.MVSControllerFactory")
private javax.jms.ConnectionFactory connectionFactory;

@PostConstruct
public void initialize() {
    try {
      jmsConnection = connectionFactory.createConnection();
      session = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      System.out.println("Connection to JMS Provider established");
    } catch (Exception e) { }
}

public Serializable sendMessageWithResponse(Destination reqDest, Destination respDest, Serializable request) {
    Serializable response = null;

    try {
        utx.begin();
        Random rand = new Random();
        String correlationId = rand.nextLong() + "-" + (new Date()).getTime();

        // prepare the sending message object
        ObjectMessage reqMsg = session.createObjectMessage();
        reqMsg.setObject(request);
        reqMsg.setJMSReplyTo(respDest);
        reqMsg.setJMSCorrelationID(correlationId);

        // prepare the publishers and subscribers
        MessageProducer producer = session.createProducer(reqDest);

        // send the message
        producer.send(reqMsg);
        System.out.println("Request Message has been sent!");
        utx.commit();

        // need to start second transaction, otherwise the first msg never gets sent
        utx.begin();
        MessageConsumer consumer = session.createConsumer(respDest, "JMSCorrelationID = '" + correlationId + "'");
        jmsConnection.start();
        ObjectMessage respMsg = (ObjectMessage) consumer.receive(10000L);
        utx.commit();

        if (respMsg != null) {
            response = respMsg.getObject();
            System.out.println("Response Message has been received!");
        } else {
            // timeout waiting for response
            System.out.println("Timeout waiting for response!");
        }

    } catch (Exception e) { }

    return response;
}

SLSB 2 - LocationService.Java(只有回复方式,其余同上)

public boolean reply(Message origMsg, Serializable o) {
    boolean rc = false;

    try {
        // check if we have necessary correlationID and replyTo destination
        if (!origMsg.getJMSCorrelationID().equals("") && (origMsg.getJMSReplyTo() != null)) {
            // prepare the payload
            utx.begin();
            ObjectMessage msg = session.createObjectMessage();
            msg.setObject(o);

            // make it a response
            msg.setJMSCorrelationID(origMsg.getJMSCorrelationID());
            Destination dest = origMsg.getJMSReplyTo();

            // send it
            MessageProducer producer = session.createProducer(dest);
            producer.send(msg);
            producer.close();
            System.out.println("Reply Message has been sent");
            utx.commit();

            rc = true;
        }

    } catch (Exception e) {}

    return rc;
}

太阳资源.xml

<admin-object-resource enabled="true" jndi-name="jms/mvs.LocationControllerRequest"  res-type="javax.jms.Queue"  res-adapter="jmsra">
    <property name="Name" value="mvs.LocationControllerRequestQueue"/>
</admin-object-resource>
<admin-object-resource enabled="true" jndi-name="jms/mvs.LocationControllerResponse"  res-type="javax.jms.Queue"  res-adapter="jmsra">
    <property name="Name" value="mvs.LocationControllerResponseQueue"/>
</admin-object-resource>

<connector-connection-pool name="jms/mvs.MVSControllerFactoryPool"  connection-definition-name="javax.jms.QueueConnectionFactory"  resource-adapter-name="jmsra"/>
<connector-resource enabled="true" jndi-name="jms/mvs.MVSControllerFactory" pool-name="jms/mvs.MVSControllerFactoryPool"  />
4

1 回答 1

1

请求/回复模式,即使使用 JMS,本质上仍然是同步的。调用者发送一条消息,然后等待回复。这不仅因为分布式事务而变得复杂,而且还意味着在等待回复时,会分配和浪费一个或多个资源(至少在这种情况下是线程)。您不能以这种方式扩展:您天生受到线程数量的限制。

要拥有真正可扩展的 JMS 架构,一切都必须是异步的。换句话说:你永远不应该等待。发送和接收的消息应该传递必要的信息来触发下一个活动。

如果消息的大小太大,您可以只存储一个标识符并将相应的数据存储在数据库中。但随后数据库再次成为争论的焦点。

如果不同的消息需要知道它们参与了哪个长时间运行的进程,您还可以使用相关标识符。当接收到消息时,接收方可以使用相关标识符“恢复”长时间运行的活动。这是 BPEL 的传统模式。同步请求/回复和带有相关标识符的异步消息之间的主要区别在于可以在每个步骤之间释放资源。您可以使用后者进行扩展,但不能使用第一个。

老实说,我对您的长篇文章感到困惑,并且不明白您的设计是相当异步(并且正确),还是与请求/回复同步(并且有问题)。但我希望我提供了一些答案。

无论如何,请访问Enterprise Integration Patterns网站,这是一个有价值的信息来源。

于 2010-03-23T11:54:00.420 回答