0

I am trying to move the message from a queue1(Dead Letter queue) to queue2 in active MQ at periodic interval of 5 minutes using Camel router. I am using below code to achieve this :-

    public class MessageRouteBuilder extends RouteBuilder {

    private static final Logger LOG =
            LoggerFactory.getLogger(MessageRouteBuilder.class);

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.camel.builder.RouteBuilder#configure()
     */
    @Override
    public void configure() throws Exception {
        LOG.info("Routing of camel is started");
        CronScheduledRoutePolicy startPolicy = new CronScheduledRoutePolicy();
        startPolicy.setRouteStartTime("0 0/5 * * * ?");

        from(
            "jms:queue:DLQ.Consumer.OUTDOCS.VirtualTopic.queue1")
                .routeId("DLQMessageMoverID").routePolicy(startPolicy)
                .noAutoStartup()
                .to("jms:queue:Consumer.OUTDOCS.VirtualTopic.queue1");
        LOG.info("Routing of camel is done");

    }

}


@Startup
@Singleton
public class ScheduledMessageDLQConsumer {

    @Inject
    private MessagingUtil msgUtil;

    @Inject
    private MessageRouteBuilder builder;

    private static final Logger LOG =
            LoggerFactory.getLogger(ScheduledMessageDLQConsumer.class);
    @PostConstruct
    public void init() {
        LOG.info("camel Scheduling scheduled started");
        CamelContext camelContext = new DefaultCamelContext();
        ConnectionFactory connectionFactory = msgUtil.getAMQConnectionFactory();
        camelContext.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

        try {
            camelContext.addRoutes(builder);
            camelContext.start();
            LOG.info("Camel scheduling completed");
        } catch (Exception e) {
            // TODO Auto-generated catch block

            LOG.error("Error in registering camel route builder", e);
        }

        LOG.info(" camel Scheduling scheduled completed");
    }

}

Problem here is that:- Camel routing gets enabled after 5 minutes. It moves the message from DLQ (DLQ.Consumer.OUTDOCS.VirtualTopic.queue1) to queue1 (Consumer.OUTDOCS.VirtualTopic.queue1). But if message is poison , it again comes back to DLQ and again routing moves the message from DLQ to normal queue and this process keeps on running infinitely.

My requirement is that routing should move the message once only from DLQ to queue after every 5 minutes ? if poison message comes, it should check after 5 minutes only.

4

1 回答 1

1

首先,你的整个想法看起来是糟糕的设计。重新处理和重新交付应在消费者或代理上处理,没有任何晦涩的定期“DLQMessageMover”。如果您可以控制从OUTDOCS.VirtualTopic.queue1使用的应用程序,请重新考虑错误处理的概念。

顺便说一句,maximumRedeliveries =-1 和redeliveryDelay =300000 在消费者连接上的简单组合将具有与所有代码相同的影响,您在这个问题中已经写过。

其次,您需要具有名称 JMSCorrelationID 的标头上的相关键的幂等消费者。这个过程每个相关 id 只处理一次。使用MemoryIdempotentRepository时,它会在路由重新启动时被清除,因此会再次处理消息,这符合您的要求。

我创建了一个小例子来展示它是如何工作的。在您的情况下,将不会模拟 JMSCorrelationID 标头和 jms 组件而不是计时器。

public class IdempotentConsumerRouteBuilder extends RouteBuilder {
private final IdempotentRepository idempotentRepository = new MemoryIdempotentRepository();
private final List<String> mockCorrelationIds = Arrays.asList("id0","id0","id0","id1","id2","id0","id4","id0","id6","id7");

public void configure() {
    CronScheduledRoutePolicy startPolicy = new CronScheduledRoutePolicy();
    startPolicy.setRouteStopTime("0 0/5 * * * ?");
    startPolicy.setRouteStartTime("0 0/5 * * * ?");

    from("timer:jms?period=100")
            .routePolicy(startPolicy)
            .process(e -> e.getIn().setHeader(
                    "JMSCorrelationID", //Mock JMSCorrelationID to work with timer as it is jms component
                    mockCorrelationIds.get(e.getProperty("CamelTimerCounter", Integer.class)%10))
            )
            .idempotentConsumer(header("JMSCorrelationID"), idempotentRepository)
            .log("correlationId is ${header.JMSCorrelationID}")
            .to(("log:done?level=OFF"))
            .end();

}}

并输出此代码:

[artzScheduler-camel-1_Worker-3] DefaultCamelContext            INFO  Route: route1 started and consuming from: timer://jms?period=100
[mel-1) thread #4 - timer://jms] route1                         INFO  correlationId is id0
[mel-1) thread #4 - timer://jms] route1                         INFO  correlationId is id1
[mel-1) thread #4 - timer://jms] route1                         INFO  correlationId is id2
[mel-1) thread #4 - timer://jms] route1                         INFO  correlationId is id4
[mel-1) thread #4 - timer://jms] route1                         INFO  correlationId is id6
[mel-1) thread #4 - timer://jms] route1                         INFO  correlationId is id7
[artzScheduler-camel-1_Worker-6] DefaultShutdownStrategy        INFO  Starting to graceful shutdown 1 routes (timeout 10000 milliseconds)
[el-1) thread #5 - ShutdownTask] DefaultShutdownStrategy        INFO  Route: route1 shutdown complete, was consuming from: timer://jms?period=100
[artzScheduler-camel-1_Worker-6] DefaultShutdownStrategy        INFO  Graceful shutdown of 1 routes completed in 0 seconds
[artzScheduler-camel-1_Worker-6] DefaultCamelContext            INFO  Route: route1 is stopped, was consuming from: timer://jms?period=100
[artzScheduler-camel-1_Worker-8] ScheduledRoutePolicy           WARN  Route is not in a started/suspended state and cannot be stopped. The current route state is Stopped
[artzScheduler-camel-1_Worker-7] DefaultCamelContext            INFO  Route: route1 started and consuming from: timer://jms?period=100
[mel-1) thread #6 - timer://jms] route1                         INFO  correlationId is id0
[mel-1) thread #6 - timer://jms] route1                         INFO  correlationId is id1
[mel-1) thread #6 - timer://jms] route1                         INFO  correlationId is id2
[mel-1) thread #6 - timer://jms] route1                         INFO  correlationId is id4
[mel-1) thread #6 - timer://jms] route1                         INFO  correlationId is id6
[mel-1) thread #6 - timer://jms] route1                         INFO  correlationId is id7
[rtzScheduler-camel-1_Worker-10] DefaultShutdownStrategy        INFO  Starting to graceful shutdown 1 routes (timeout 10000 milliseconds)
于 2017-11-13T21:29:08.873 回答