-1

我正在使用带有activemq的apache骆驼。我已经配置了运行 exe 文件的消费者。这个 exe 文件将反过来运行一个应用程序。

我希望消息在队列中,直到这个过程发生。所以,如果我的 exe 崩溃/orexe 被重命名等,我希望消息在队列中。同样,如果 exe 工作正常并且它调用应用程序并且应用程序发生了一些事情,即使我需要将消息保留在队列中。如果这两个过程成功发生,那么我的消息可以出队。

我已经使用简单的 java 代码使用会话和重新传递策略编写了这个逻辑。但我想用spring框架重写(camel是用这个集成的),我不知道如何去做。PS我不是在寻找诸如消息未传递到exe之类的错误,而是诸如如何在上面解释的不测事件中保留消息之类的问题

activemqcamel.xml 配置如下。我发布此内容的原因是配置没有连接工厂等,因此 apache camel 应该以某种方式创建会话。

<camelContext id="activeContext" xmlns="http://camel.apache.org/schema/spring">
  <routeBuilder ref="activeMQRouter" />
  </camelContext>

  <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="tcp://localhost:61616?          jms.prefetchPolicy.queuePrefetch=1" />
</bean>

  <bean id="activeMQRouter"      class="main.java.com.aranin.activemq.ActiveMQRouterBuilder"/>

  <bean id="activemqProcessor" class="main.java.com.aranin.activemq.ActiveMQProcessor"/>

**关于我如何实现回滚的旧代码* * ** * ** * ***

package PackageName;
import java.io.File;
import java.io.IOException;
import javax.jms.*;
import javax.servlet.RequestDispatcher;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
 public class FirstConsumer extends HttpServlet {
@Override
protected void service(HttpServletRequest request, HttpServletResponse response)
    throws ServletException, IOException{
try {
//creating connectionfactory object for way
ActiveMQConnectionFactory connectionFactory=new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61617?jms.prefetchPolicy.queuePrefetch=1");
//creating object for RedeliveryPolicy clas
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setInitialRedeliveryDelay(1000L);
policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES);
connectionFactory.setRedeliveryPolicy(policy);
connectionFactory.setUseRetroactiveConsumer(true);
//establishing the connection b/w this Application and Activemq
Connection connection=connectionFactory.createConnection();
//connection.setClientID("Testing");
final Session session=connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
Queue queue=session.createQueue("ThermalMap");
javax.jms.MessageConsumer consumer=session.createConsumer(queue);
//Creating FunctionNames class object for calling integration method.
final FunctionNames functionNamesObject=new FunctionNames();
//Listening queues from Activemq
MessageListener listener = new MessageListener() {
    @Override
    public void onMessage(Message msg) {
        //getting message, and converting message object to Textmessage object
        TextMessage msg1=(TextMessage)msg;
        try {
            functionNamesObject.Integration(
                                            ".txt",
                                            getServletContext().getRealPath("/Copy"),
                                            getServletContext().getRealPath("/Rod1"),
                                            "ThMapInfratab1-2.exe",
                                            "TMapInput.txt"
            );
            //Triggering MMA Application
            //functionNamesObject.executeHttp("http://localhost:8080/SlicerApp/index.jsp");
            session.commit();
        }
        catch (IOException e) {
            try {
                session.rollback();
            } catch (JMSException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
        }
        catch (InterruptedException e) {
            try {
                session.rollback();
            } catch (JMSException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
        } catch (JMSException e) {
            try {
                session.rollback();
            } catch (JMSException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
        }       
    }
};
consumer.setMessageListener(listener);
connection.start();    
//session.rollback();
}
catch (Exception e) {
e.printStackTrace();
}
   }
}
4

1 回答 1

0

阅读有关交易的文档:http ://camel.apache.org/transactional-client.html

于 2013-09-17T14:31:59.483 回答