1

我正在使用 Spring Integration 使用 DSL 来处理 JMS 和 REST 服务之间的通信。要求是消息应该无限期地重新传递。在一种情况下,我必须顺序执行两个操作。如果第一个失败,则不应执行第二个,但如果是 4xx 错误,我不应该尝试重新传递它。我的代码如下所示:

IntegrationFlows.from(Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destinationn)).get())
   .publishSubscribeChannel(c -> c
      .subscribe(firstRestOperation ->                                                  
         firstRestOperation
         .transform(originalMessageToFirstRequestTransformer())
         .handle(Http.outboundGateway(restApiBaseUri + "/first-endpoint", restTemplate)
                     .httpMethod(HttpMethod.POST).get())  //when this handler receives HTTP Status 4xx, 
                                                          //second operation shouldn't be executed and 
                                                          //and message shouldn't be redelievered
      .subscribe(secondRestOperation->
         secondRestOperation
         .transform(originalMessageToSecondRequestTransformer())
         .handle(Http.outboundGateway(restApiBaseUri + "/second-endpoint", restTemplate).httpMethod(HttpMethod.POST).get())
).get();

class MyErrorHandler extends DefaultResponseErrorHandler { //this is used in Option B


    @Override
    public void handleError(ClientHttpResponse response) throws IOException {
        if(response.getStatusCode().is4xxClientError()){
            log.warn(...);
        }else{
            super.handleError(response);
        }
    }
}

@Bean
public RestTemplate restTemplate() {
   RestTemplate restTemplate = new RestTemplate();
   restTemplate.setErrorHandler(myErrorHandler); //this is used in Option B
   return restTemplate;
}

我怎样才能满足这些要求?我唯一的想法是在提交 JMS 会话时以某种方式中断 IntegrationFlow。

感谢您的任何建议。

更新

选项 A: 目前:

  • 操作 1 失败并出现任何错误
  • 操作 2 未执行
  • 消息被不确定地重新传递

选项 B:我也可以处理 4xx 错误,然后:

  • 操作 1 以 4xx 失败,已处理异常
  • 执行操作 2
  • 集成流程正常完成,提交 JMS 会话并且未重新传递消息

但这会导致执行操作 2

我需要的是:

  • 操作 1 以 4xx 失败

  • 操作 2执行

  • 消息重新发送

更新 2

我想我可能会有所进展。正如@gary-russel 建议的那样,我添加了错误通道,并处理了 4xx 错误:

    @Bean
    public MessageProducerSupport inputUpsertCustomerMessageProducerSupport() {
        return Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destination).messageSelector(jmsSelector)).errorChannel(errorHandlingChannel).get();
    }


    @Bean
    public PublishSubscribeChannel errorHandlingChannel() {
        return MessageChannels.publishSubscribe().get();
    }
    
    @Bean
    public ErrorMessageExceptionTypeRouter errorMessageExceptionTypeRouter() {
        ErrorMessageExceptionTypeRouter router = new ErrorMessageExceptionTypeRouter();
        router.setChannelMapping(HttpClientErrorException.class.getName(), "clientErrorMessageChannel");
        router.setDefaultOutputChannel(unhandledErrorsChannel());
        return router;
    }


    @Bean
    public IntegrationFlow errorHandlingFlow() {
        return IntegrationFlows.from(customErrorChannel())
                .log()
                .route(errorMessageExceptionTypeRouter())
                .get();
    }
    

    @Bean
    public MessageChannel clientErrorMessageChannel(){
        return MessageChannels
                .direct()
                .get();
    }


    @Bean
    public IntegrationFlow clientErrorFlow() {
        return IntegrationFlows.from(clientErrorMessageChannel())
                .handle(message -> log.warn(...)    //handle error here
                .get();
    }


        @Bean
    public MessageChannel unhandledErrorsChannel(){
        return MessageChannels.direct().get();
    }
    
        @Bean
    public IntegrationFlow unhandledErrorsFlow(){
        //how should I implement it?
    }

我只想处理 4xx 错误,其余的应该被传播并应该导致 JMS 消息重新传递。我尝试不设置defaultOutputChannelErrorMessageExceptionTypeRouter而不是引发另一个异常)或设置defaultOutputChannel为默认值errorChannel(而不是处理所有错误)。

更新 3

在以下位置找到解决方案:Spring Integration Java DSL using JMS retry/redlivery

这是我的错误处理流程的代码:

 @Bean
    public MessageProducerSupport inputUpsertCustomerMessageProducerSupport() {
        return Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destination).messageSelector(jmsSelector)).errorChannel(customErrorChannel()).get();
    }

    @Bean
    public PublishSubscribeChannel customErrorChannel() {
        return MessageChannels.publishSubscribe().get();
    }

    @Bean
    public ErrorMessageExceptionTypeRouter errorMessageExceptionTypeRouter() {
        ErrorMessageExceptionTypeRouter router = new ErrorMessageExceptionTypeRouter();
        router.setChannelMapping(HttpClientErrorException.class.getName(), "clientErrorMessageChannel");
        router.setDefaultOutputChannel(unhandledErrorsChannel());
        return router;
    }

    @Bean
    public MessageChannel clientErrorMessageChannel(){
        return MessageChannels
                .direct()
                .get();
    }

    @Bean
    public MessageChannel unhandledErrorsChannel(){
        return MessageChannels.direct().get();
    }

    @Bean
    public IntegrationFlow unhandledErrorsFlow(){
        return IntegrationFlows.from(unhandledErrorsChannel()).handle("thisBeanName", "handleError").get();
    }

    public void handleError(Throwable t) throws Throwable {
        log.warn("Received unhandled exception");
        throw t;
    }

    @Bean
    public IntegrationFlow clientErrorFlow() {
        return IntegrationFlows.from(clientErrorMessageChannel())
                .handle(message -> log.warn("Received HTTP Status 4xx with message: " + ((MessageHandlingException)message.getPayload()).getCause().getMessage()))
                .get();
    }

    @Bean
    public IntegrationFlow errorHandlingFlow() {
        return IntegrationFlows.from(customErrorChannel())
                .log()
                .route(errorMessageExceptionTypeRouter())
                .get();
    }

因此解决方案是将异常重定向到将通过重新抛出它们来处理它们的流。太糟糕BaseIntegrationFlow了,没有接受和抛出的方法Throwable- 现在只能通过指定要调用的 bean 和方法名称来实现。

4

1 回答 1

1

这是默认行为;ignoreFailures除非属性是truefalse默认情况下),否则不会调用第二个订阅者。

您需要显示上游流程,但要“捕获”异常,您需要向(可能)消息驱动的入站适配器添加错误通道并在那里处理异常。

于 2022-01-19T20:42:30.080 回答