我有一个具有以下路线的应用程序:
from("netty:tcp://localhost:5150?sync=false&keepAlive=true")
.routeId("tcp.input")
.transform()
.simple("insert into tamponems (AVIS) values (\"${in.body}\");")
.to("jdbc:mydb");
此路由每 59 毫秒接收一条新消息。当与数据库的连接丢失时,我想在第二条消息到达之前停止路由。主要是,我希望永远不会丢失消息。
我是这样进行的:
我添加了一个errorHandler
:
errorHandler(deadLetterChannel("direct:backup")
.redeliveryDelay(5L)
.maximumRedeliveries(1)
.retryAttemptedLogLevel(LoggingLevel.WARN)
.logExhausted(false));
我errorHandler
尝试重新传递消息,如果再次失败,它会将消息重定向到deadLetterChannel
.
以下 deadLetterChannel 将停止tcp.input
路由并尝试将消息重新传递到数据库:
RoutePolicy policy = new StopRoutePolicy();
from("direct:backup")
.routePolicy(policy)
.errorHandler(
defaultErrorHandler()
.redeliveryDelay(1000L)
.maximumRedeliveries(-1)
.retryAttemptedLogLevel(LoggingLevel.ERROR)
)
.to("jdbc:mydb");
这是的代码routePolicy
:
public class StopRoutePolicy extends RoutePolicySupport {
private static final Logger LOG = LoggerFactory.getLogger(String.class);
@Override
public void onExchangeDone(Route route, Exchange exchange) {
String stop = "tcp.input";
CamelContext context = exchange.getContext();
if (context.getRouteStatus(stop) != null && context.getRouteStatus(stop).isStarted()) {
try {
exchange.getContext().getInflightRepository().remove(exchange);
LOG.info("STOP ROUTE: {}", stop);
context.stopRoute(stop);
} catch (Exception e) {
getExceptionHandler().handleException(e);
}
}
}
}
我对这种方法的问题是:
- 在我的
"direct:backup"
路线中,如果我将其设置maximumRedeliveries
为 -1,路线tcp.input
将永远不会停止 - 我在停车期间丢失了消息
- 这种检测连接丢失和停止路由的方法太长了
拜托,有没有人有想法让这个更快或者为了不丢失信息而做出不同的改变?