我正在使用带有activemq的apache骆驼。我已经配置了运行 exe 文件的消费者。这个 exe 文件将反过来运行一个应用程序。
我希望消息在队列中,直到这个过程发生。所以,如果我的 exe 崩溃/orexe 被重命名等,我希望消息在队列中。同样,如果 exe 工作正常并且它调用应用程序并且应用程序发生了一些事情,即使我需要将消息保留在队列中。如果这两个过程成功发生,那么我的消息可以出队。
我已经使用简单的 java 代码使用会话和重新传递策略编写了这个逻辑。但我想用spring框架重写(camel是用这个集成的),我不知道如何去做。PS我不是在寻找诸如消息未传递到exe之类的错误,而是诸如如何在上面解释的不测事件中保留消息之类的问题
activemqcamel.xml 配置如下。我发布此内容的原因是配置没有连接工厂等,因此 apache camel 应该以某种方式创建会话。
<camelContext id="activeContext" xmlns="http://camel.apache.org/schema/spring">
<routeBuilder ref="activeMQRouter" />
</camelContext>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="tcp://localhost:61616? jms.prefetchPolicy.queuePrefetch=1" />
</bean>
<bean id="activeMQRouter" class="main.java.com.aranin.activemq.ActiveMQRouterBuilder"/>
<bean id="activemqProcessor" class="main.java.com.aranin.activemq.ActiveMQProcessor"/>
**关于我如何实现回滚的旧代码* * ** * ** * ***
package PackageName;
import java.io.File;
import java.io.IOException;
import javax.jms.*;
import javax.servlet.RequestDispatcher;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
public class FirstConsumer extends HttpServlet {
@Override
protected void service(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException{
try {
//creating connectionfactory object for way
ActiveMQConnectionFactory connectionFactory=new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61617?jms.prefetchPolicy.queuePrefetch=1");
//creating object for RedeliveryPolicy clas
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setInitialRedeliveryDelay(1000L);
policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES);
connectionFactory.setRedeliveryPolicy(policy);
connectionFactory.setUseRetroactiveConsumer(true);
//establishing the connection b/w this Application and Activemq
Connection connection=connectionFactory.createConnection();
//connection.setClientID("Testing");
final Session session=connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
Queue queue=session.createQueue("ThermalMap");
javax.jms.MessageConsumer consumer=session.createConsumer(queue);
//Creating FunctionNames class object for calling integration method.
final FunctionNames functionNamesObject=new FunctionNames();
//Listening queues from Activemq
MessageListener listener = new MessageListener() {
@Override
public void onMessage(Message msg) {
//getting message, and converting message object to Textmessage object
TextMessage msg1=(TextMessage)msg;
try {
functionNamesObject.Integration(
".txt",
getServletContext().getRealPath("/Copy"),
getServletContext().getRealPath("/Rod1"),
"ThMapInfratab1-2.exe",
"TMapInput.txt"
);
//Triggering MMA Application
//functionNamesObject.executeHttp("http://localhost:8080/SlicerApp/index.jsp");
session.commit();
}
catch (IOException e) {
try {
session.rollback();
} catch (JMSException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
catch (InterruptedException e) {
try {
session.rollback();
} catch (JMSException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
} catch (JMSException e) {
try {
session.rollback();
} catch (JMSException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
};
consumer.setMessageListener(listener);
connection.start();
//session.rollback();
}
catch (Exception e) {
e.printStackTrace();
}
}
}