2

我已经用 Spring Integration 定义了一个很好的流程,在名义上它可以按照我的意愿工作。

但是我还没有想出一种方法来定义处理错误的行为(即将输​​入行标记为失败)

这是我得到的错误:

 [ask-scheduler-2] cMessagingTemplate$TemporaryReplyChannel : Reply message received but the receiving thread has exited due to an exception while sending the request message:ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [outboundGateway]; nested exception is...

这是我的流程配置(简化):

@Configuration
@EnableIntegration
public class IntegrationConfig {


    @Autowired
    private DataSource dataSource;

    @Bean
    public IntegrationFlow mainFlow() {
        //noinspection unchecked
        return IntegrationFlows.from(jdbcMessageSource(),
                c -> c.poller(Pollers.fixedRate(5000)
                        .transactional(transactionManager())))
                .split()
                .enrich(e -> e
                        .requestChannel(subChannel())
                        .requestPayload(Message::getPayload)
                        .propertyExpression("fooId", "payload.id"))
                .handle(barHandler())
                .get();
    }

    @Bean
    public IntegrationFlow enricherFlow() {
        return IntegrationFlows.from(subChannel())
                .handle(outboundGateway())
                .get();
    }

    @Bean
    public MessageChannel subChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageSource<Object> jdbcMessageSource() {
        return new JdbcPollingChannelAdapter(this.dataSource,
                "select * from FOO");
    }

    @Bean
    public JdbcOutboundGateway outboundGateway() {
        ...
    }

    @Bean
    public JdbcMessageHandler barHandler() {
        return new JdbcMessageHandler(dataSource,
                "INSERT INTO BAR ...");
    }


    @Bean
    public PlatformTransactionManager transactionManager() {
        return new DataSourceTransactionManager(dataSource);
    }
}

我原以为我可以在默认的 errorChannel 上得到错误并在那里执行必要的任务,但我还没有找到方法来完成这项工作。

任何想法和帮助都非常感谢!

4

1 回答 1

1

您知道,我们似乎错过了errorChannelEnricherSpecJava DSL 公开的内容 - 请随时就此事提出JIRA 问题。但无论如何,看起来我们可以访问该属性:

.enrich(e -> {
            ContentEnricher contentEnricher =
                    e.requestChannel(subChannel())
                            .requestPayload(Message::getPayload)
                            .propertyExpression("fooId", "payload.id"))
                            .get().getT2();
            contentEnricher.setErrorChannel(enricherErrorChannel());
        })

现在,您可以向其中添加任何.handle()流程enricherErrorChannel()ErrorMessage在钓鱼时进行处理。例如,通常ErrorMessage包含一个我们拥有属性payload的地方。正是那个包含有关您的原始信息的信息,以及:MessagingExceptionfailedMessagepayloadheaders

https://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/configuration.html#namespace-errorhandler

于 2018-01-19T14:23:01.097 回答