5

对于以下场景,我正在寻找您关于最佳实践的建议和技巧:

在分布式(主要基于 Java)系统中:

  • 许多(不同的)客户端应用程序(网络应用程序、命令行工具、REST API)
  • 中央 JMS 消息代理(目前支持使用 ActiveMQ)
  • 多个独立处理节点(在多台远程机器上运行,计算 JMS 消息有效负载指定的不同类型的昂贵操作)

如何最好地应用Spring Integration框架提供的 JMS 支持来将客户端与工作节点分离?在阅读参考文档和一些最初的实验时,看起来 JMS 入站适配器的配置本质上需要使用订阅者,而在解耦场景中不存在订阅者。

小注:应该通过 JMS 文本消息进行通信(使用 JSON 数据结构以实现未来的可扩展性)。

4

3 回答 3

4

这并不能真正回答您的问题,但请确保您查看Apache Camel以连接您的不同组件。我发现它对于将 JMS 队列连接到现有 Web 服务并计划将其用于其他组件非常有用。

监控 ActiveMQ 队列中的消息、转换它们并将它们发布到 Web 服务的示例:

<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:camel="http://camel.apache.org/schema/spring"
   xsi:schemaLocation="
      http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
      http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring-2.3.0.xsd">

<bean id="callbackProcessor" class="com.package.CallbackProcessor"/>

<bean id="activemq" class="org.apache.camel.component.jms.JmsComponent">
    <property name="connectionFactory" ref="jmsFactory" />
</bean>

<camel:camelContext id="camel">
    <!-- Must put this in camel:endpoint because camel:from doesn't support property substitution -->
    <camel:endpoint id="callbackQueue" uri="activemq:queue:${jms.callback-queue-name}"/>
    <camel:route>
        <camel:from ref="callbackQueue"/>
        <camel:process ref="callbackProcessor"/>
        <camel:to uri="http://dummy"/><!-- This will be replaced by the callbackProcessor with the callback URL in the message -->
    </camel:route>
</camel:camelContext>
</beans>

这就是我们的 Spring 应用程序启动 Camel 并开始处理消息所必需的一切。

于 2010-06-10T08:38:19.817 回答
3

这是我今天提出的Spring Integration,如果您发现可以改进的地方,请跟进。

在客户端,可以通过 SimpleMessagingGateway 发送和接收消息:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"   
    xmlns:integration="http://www.springframework.org/schema/integration"
    xmlns:jms="http://www.springframework.org/schema/integration/jms"   
    xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/integration/jms
            http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration.xsd">

    <import resource="integration-common.xml"/>

    <!-- Communication Gateway for the Client (send/receive) -->
    <bean id="gateway" class="org.springframework.integration.gateway.SimpleMessagingGateway">
        <property name="requestChannel" ref="SenderChannel"/>
        <property name="replyChannel" ref="InboundChannel"/>
        <property name="replyTimeout" value="1000"/>
    </bean><!-- TODO: could use integration:gateway -->

    <!-- Sending out message to JMS request queue -->
    <integration:channel id="SenderChannel"/>
    <jms:outbound-channel-adapter
                        channel="SenderChannel" 
                        destination="requestQueue" />

    <!-- Listen to incoming messages on JMS reply queue -->
    <integration:channel id="InboundChannel">
        <integration:queue/>
    </integration:channel>
    <jms:message-driven-channel-adapter
            destination="replyQueue"
            channel="InboundChannel" />

</beans>

处理节点端的配置如下所示(有关 Spring Integration 元素的更多解释,请参见内联注释):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"   
    xmlns:integration="http://www.springframework.org/schema/integration"
    xmlns:jms="http://www.springframework.org/schema/integration/jms"   
    xmlns:stream="http://www.springframework.org/schema/integration/stream" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/integration/jms
            http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration.xsd">

    <import resource="integration-common.xml"/>

    <!-- Read in Message Endpoint Service Activator classes --> 
    <context:component-scan base-package="sample.integration.jmsbasic"/>

    <!-- Listen to incoming messages on the JMS request queue -->
    <integration:channel id="jmsinToProcChannel"/>
    <jms:message-driven-channel-adapter
            destination="requestQueue"
            channel="jmsinToProcChannel"/>

    <!-- Delegate message to service implementation and take care of answer -->
    <integration:service-activator 
            input-channel="jmsinToProcChannel" 
            ref="procService"
            output-channel="jmsBackChannel" />

    <!-- Send answer back to JMS reply queue -->
    <integration:channel id="jmsBackChannel"/>
    <jms:outbound-channel-adapter
                        channel="jmsBackChannel" 
                        destination="replyQueue" />

</beans>
于 2010-06-10T21:58:16.257 回答
1

您是否在问是否可以使用 Spring Integration 来实现协议?那么答案是肯定的,而且非常简单。

于 2010-06-10T09:24:01.253 回答