2

我有一个具有以下路线的应用程序:

from("netty:tcp://localhost:5150?sync=false&keepAlive=true")
.routeId("tcp.input")
.transform()
.simple("insert into tamponems (AVIS) values (\"${in.body}\");")
.to("jdbc:mydb");

此路由每 59 毫秒接收一条新消息。当与数据库的连接丢失时,我想在第二条消息到达之前停止路由。主要是,我希望永远不会丢失消息

我是这样进行的:

我添加了一个errorHandler

errorHandler(deadLetterChannel("direct:backup")
.redeliveryDelay(5L)
.maximumRedeliveries(1)
.retryAttemptedLogLevel(LoggingLevel.WARN)
.logExhausted(false));

errorHandler尝试重新传递消息,如果再次失败,它会将消息重定向到deadLetterChannel.

以下 deadLetterChannel 将停止tcp.input路由并尝试将消息重新传递到数据库:

RoutePolicy policy = new StopRoutePolicy();
from("direct:backup")
.routePolicy(policy)
.errorHandler(
  defaultErrorHandler()
  .redeliveryDelay(1000L)
  .maximumRedeliveries(-1)
  .retryAttemptedLogLevel(LoggingLevel.ERROR)
)
.to("jdbc:mydb");

这是的代码routePolicy

public class StopRoutePolicy extends RoutePolicySupport {

  private static final Logger LOG = LoggerFactory.getLogger(String.class);

  @Override
  public void onExchangeDone(Route route, Exchange exchange) {
    String stop = "tcp.input";
    CamelContext context = exchange.getContext();
    if (context.getRouteStatus(stop) != null && context.getRouteStatus(stop).isStarted()) {
      try {
        exchange.getContext().getInflightRepository().remove(exchange);
        LOG.info("STOP ROUTE: {}", stop);
        context.stopRoute(stop);
      } catch (Exception e) {
        getExceptionHandler().handleException(e);
      }
    }
  }

}

我对这种方法的问题是:

  • 在我的"direct:backup"路线中,如果我将其设置maximumRedeliveries为 -1,路线tcp.input将永远不会停止
  • 我在停车期间丢失了消息
  • 这种检测连接丢失和停止路由的方法太长了

拜托,有没有人有想法让这个更快或者为了不丢失信息而做出不同的改变?

4

1 回答 1

3

我终于找到了解决问题的方法。为了让应用程序更快,我用 seda添加了异步进程多线程。

from("netty:tcp://localhost:5150?sync=false&keepAlive=true").to("seda:break");


from("seda:break").threads(5)
.routeId("tcp.input")
.transform()
.simple("insert into tamponems (AVIS) values (\"${in.body}\");")
.to("jdbc:mydb");

我对备份路由做了同样的事情。

from("seda:backup")
.routePolicy(policy)
.errorHandler(
  defaultErrorHandler()
  .redeliveryDelay(1000L)
  .maximumRedeliveries(-1)
  .retryAttemptedLogLevel(LoggingLevel.ERROR)
).threads(2).to("jdbc:mydb");

我像这样修改了 routePolicy:

public class StopRoutePolicy extends RoutePolicySupport {

  private static final Logger LOG = LoggerFactory.getLogger(String.class);

  @Override
  public void onExchangeBegin(Route route, Exchange exchange) {
    String stop = "tcp.input";
    CamelContext context = exchange.getContext();
    if (context.getRouteStatus(stop) != null && context.getRouteStatus(stop).isStarted()) {
      try {
        exchange.getContext().getInflightRepository().remove(exchange);
        LOG.info("STOP ROUTE: {}", stop);
        context.stopRoute(stop);
      } catch (Exception e) {
        getExceptionHandler().handleException(e);
      }
    }
  }

  @Override
  public void onExchangeDone(Route route, Exchange exchange) {
    String stop = "tcp.input";
    CamelContext context = exchange.getContext();
    if (context.getRouteStatus(stop) != null && context.getRouteStatus(stop).isStopped()) {
      try {
        LOG.info("RESTART ROUTE: {}", stop);
        context.startRoute(stop);
      } catch (Exception e) {
        getExceptionHandler().handleException(e);
      }
    }
  }
}

通过这些更新,TCP 路由在处理备份路由之前停止。当 jdbc 连接恢复时,路由会重新启动

现在,多亏了 Camel,应用程序能够在不丢失消息和人工干预的情况下处理数据库故障。

我希望这可以帮助你。

于 2013-07-18T16:14:03.793 回答