3

我正在为我的公司编写电子邮件网络服务。主要要求之一是保证交付,因此我们在 JMS 传输上使用了一个薄的 HTTP 层,使用持久性 QPid 队列。

我遇到的问题之一是处理过程中的错误处理。如果我在出现错误时简单地回滚事务,则消息将转到队列的头部。如果错误足够普遍,这可能会锁定整个队列,直到有人手动干预,我们希望通过回发到头部来避免这种情况,以便同时处理消息。

然而,这就是我的问题。首先,虽然 AMQP 有一种机制来原子地“拒绝和重新排队”消息而不是确认消息,但 JMS 似乎没有任何类似的功能,所以访问它的唯一方法是通过强制转换,这将我与具体的 JMS 实现。此外,CXF 的 JMS 传输似乎没有任何方法可以在传输级别覆盖或注入行为,这意味着我要么编写字节码代理要么更改代码并重新编译以获得我想要的行为。

为了解决这个问题,我尝试了在 CXF 中实现一个错误处理程序的想法,它只是从 CXF 消息中重建 JMS 消息,然后重新排队。但是我不能使用事务处理的会话,因为故障导致我无法覆盖的回滚,然后我将在头部(来自回滚)和尾部(从重新排队)。而且我不能使用 CLIENT_ACKNOWLEDGE,因为 JMS 传输在提交消息进行处理之前会确认消息,这意味着如果服务器在错误的时间关闭,我可能会丢失消息。

所以基本上,只要我坚持接受 JMS 传输的默认行为,似乎不可能在不影响数据完整性的情况下获得我想要的行为(失败消息的重新排队)。

一位同事建议完全避开 JMS 传输,直接调用队列。然后,服务实现将是一个框架类,它的存在仅用于将消息放入队列,另一个进程将实现一个消息侦听器。对我来说,这个解决方案不是最理想的,因为我失去了不可知 Web 服务的优雅,并且由于将我的实现与底层技术耦合而失去了一些可伸缩性。

我还考虑过使用 RabbitMQ 客户端库为 AMQP 编写 CXF 传输。这将需要更长的时间,但我们公司可以继续使用它,也许可以回馈给 CXF 项目。也就是说,我对这个想法并不感兴趣,因为编写、运行和测试代码需要花费大量时间。

这是我的 CXF beans.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:util="http://www.springframework.org/schema/util"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jaxrs="http://cxf.apache.org/jaxrs"
    xmlns:jms="http://cxf.apache.org/transports/jms"
    xmlns:p="http://www.springframework.org/schema/p"
    xsi:schemaLocation="
    http://www.springframework.org/schema/beans     http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/util      http://www.springframework.org/schema/util/spring-util.xsd
    http://www.springframework.org/schema/context   http://www.springframework.org/schema/context/spring-context.xsd
    http://cxf.apache.org/jaxrs                     http://cxf.apache.org/schemas/jaxrs.xsd
    http://cxf.apache.org/transports/jms            http://cxf.apache.org/schemas/configuration/jms.xsd">

    <import resource="classpath:META-INF/cxf/cxf.xml" />
    <import resource="classpath:META-INF/cxf/cxf-extension-soap.xml" />
    <import resource="classpath:META-INF/cxf/cxf-servlet.xml" />

    <context:component-scan base-package="com.edo" />

    <bean id="jmsConnectionFactory" class="org.apache.qpid.client.AMQConnectionFactory">
        <constructor-arg name="broker" value="tcp://localhost:5672"/>
        <constructor-arg name="username" value="guest"/>
        <constructor-arg name="password" value="guest"/>
        <constructor-arg name="clientName" value=""/>
        <constructor-arg name="virtualHost" value=""/>
    </bean>

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" p:explicitQosEnabled="true" p:deliveryMode="1" p:timeToLive="5000" p:connectionFactory-ref="jmsConnectionFactory" p:sessionTransacted="false" p:sessionAcknowledgeModeName="CLIENT_ACKNOWLEDGE" />
    <bean id="jmsConfig" class="org.apache.cxf.transport.jms.JMSConfiguration" p:connectionFactory-ref="jmsConnectionFactory" p:wrapInSingleConnectionFactory="false" p:jmsTemplate-ref="jmsTemplate" p:timeToLive="500000" p:sessionTransacted="false" p:concurrentConsumers="1" p:maxSuspendedContinuations="0" p:maxConcurrentConsumers="1" />

    <jms:destination id="jms-destination-bean" name="{http://test.jms.jaxrs.edo.com/}HelloWorldImpl.jms-destination">
        <jms:address jndiConnectionFactoryName="ConnectionFactory" jmsDestinationName="TestQueue">
            <jms:JMSNamingProperty name="java.naming.factory.initial" value="org.apache.activemq.jndi.ActiveMQInitialContextFactory"/>
            <jms:JMSNamingProperty name="java.naming.provider.url" value="tcp://localhost:5672"/>
        </jms:address>
        <jms:jmsConfig-ref>jmsConfig</jms:jmsConfig-ref>
    </jms:destination>

    <jaxrs:server id="helloWorld" address="/HelloWorld" transportId="http://cxf.apache.org/transports/jms">
      <jaxrs:serviceBeans>
        <ref bean="helloWorldBean"/>
      </jaxrs:serviceBeans>
      <jaxrs:inInterceptors>
        <bean class="com.edo.jaxrs.jms.test.FlowControlInInterceptor" p:periodMs="1000" p:permitsPerPeriod="18" />
      </jaxrs:inInterceptors>
      <jaxrs:providers>
        <bean class="org.apache.cxf.jaxrs.provider.JSONProvider">
          <property name="produceMediaTypes" ref="jsonTypes"/>
          <property name="consumeMediaTypes" ref="jsonTypes"/>
        </bean>
      </jaxrs:providers>
    </jaxrs:server>

    <bean id="http-jms-config" class="com.edo.jaxrs.jms.test.HttpOverJmsConfig" 
        p:jmsFactory-ref="jmsConnectionFactory" 
        p:jmsDestinationName="TestQueue" />

    <util:list id="jsonTypes">
      <value>application/json</value>
      <value>application/jettison</value>
    </util:list>

</beans>

我缺少一些简单的东西吗?或者有没有更好的方法来解决这个问题,可以回避这个问题?

4

1 回答 1

0

所以 - 我正在接受我同事的建议,而不是使用 JMS 传输来进行 Web 服务。相反,我们将在 Spring Integration 上创建一个瘦 Web 服务层。这应该允许我们获得所需的控制粒度,而无需暴露消息传递层。

于 2011-12-13T15:34:38.497 回答