2

我有一条骆驼路线,例如:

<route errorHandlerRef="myDeadLetterErrorHandler">
    <from uri="activemq:queue:source"/>
    <to uri="activemq:queue:destA">
    <to uri="activemq:queue:destB">
    <to uri="activemq:queue:destC">
</route>

当一个端点失败时,我设置了 redeliveryPolicy 以重试发送消息几次,如果它总是失败,则将消息重新传递到 DeadLetter 队列。

现在我正在寻找一种将消息从死信队列发送到失败端点的方法。有人有什么建议吗?

我正在考虑构建一个处理器来提取故障端点的信息,如下所示:

String lastEndpointUri = exchange.getProperty(Exchange.TO_ENDPOINT, String.class);

然后构建某种动态路由......没有更简单的解决方案吗?

4

3 回答 3

0

如果你相信你可以简单地重新发送消息,它会在以后工作,然后增加重新发送计数。

否则,您可能需要一个更复杂的解决方案,因为您可能会对消息进行一些更改以使其通过。也就是说,回答“为什么失败”的问题。在许多情况下,您需要人工查看消息/错误以确定是否可以完成某些操作。你需要什么逻辑很大程度上取决于你已经实现了什么。

对于暂时性错误(网络故障,数据库关闭问题,磁盘已满..等),只需按照您所说的重新发送,对于应用程序错误,您需要三思而后行。

于 2012-10-29T11:58:26.943 回答
0

另一种选择是只处理 Camel 中的错误,而不是依赖 DLQ 重新传递逻辑。这使您可以更好地控制特定的错误场景,并可以根据需要对它们进行分类/重试。

像这样的东西......有关更多详细信息,请参阅民意调查消费者

//main route to process message from a queue (needs to be fast)
from("activemq:queue:mainQ").process(...);

//handle any errors by simply moving them to an error queue (for retry later)
onException(Exception.class)
    .handled(true).to("activemq:queue:mainErrorQ");

//retry the error queue
from("timer://retryTimer?fixedRate=true&period=60000")
    .bean(myBean, "retryErrors"); 

...

public void retryErrors() {
    // loop to empty queue
    while (true) {
        // receive the message from the queue, wait at most 3 sec
        Exchange msg = consumer.receive("activemq:queue.mainErrorQ", 3000);
        if (msg == null) {
            // no more messages in queue
            break;
        }

        // send it to the starting queue
        producer.send("activemq:queue.mainQ", msg);
    }
}   
于 2012-10-29T18:13:30.753 回答
0

感谢大家的回答,我很快解决了将端点放在单独的路由中并为每个人设置相同的 redeliveryPolicy 的问题。

    <route>
        <from uri="activemq:queue:source"/>
        <to uri="activemq:queue:destA1">
        <to uri="activemq:queue:destB1">
        <to uri="activemq:queue:destC1">
    </route>

<route errorHandlerRef="myDeadLetterErrorHandler">
    <from uri="activemq:queue:destA1"/>
    <to uri="activemq:queue:destA">
</route>

<route errorHandlerRef="myDeadLetterErrorHandler">
    <from uri="activemq:queue:destB1"/>
    <to uri="activemq:queue:destB">
</route>

...

通过这种方式,消息将始终传递到每个端点,如果一个端点失败并且当失败端点返回时,消息将被重新传递。

我认为boday提出的解决方案是正确的,将来我可能会尝试那个解决方案。我还发现,如果总消息顺序不重要,我们还可以使用“代理重新传递”,它使用特殊队列重新传递每条消息。 http://activemq.apache.org/message-redelivery-and-dlq-handling.html

于 2012-10-30T18:49:44.983 回答