1

我有以下流程:

@Resource(name = S3_CLIENT_BEAN)
private MessageSource<InputStream> messageSource;

public IntegrationFlow fileStreamingFlow() {
    return IntegrationFlows.from(s3Properties.getFileStreamingInputChannel())
        .enrichHeaders(spec -> spec.header(ERROR_CHANNEL, S3_ERROR_CHANNEL, true))
        .handle(String.class, (fileName, h) -> {
                    if (messageSource instanceof S3StreamingMessageSource) {
                        S3StreamingMessageSource s3StreamingMessageSource = (S3StreamingMessageSource) messageSource;

                        ChainFileListFilter<S3ObjectSummary> chainFileListFilter = new ChainFileListFilter<>();
                        chainFileListFilter.addFilters(...);
                        s3StreamingMessageSource.setFilter(chainFileListFilter);

                        return s3StreamingMessageSource.receive();
                    }                    
                    return messageSource.receive();
                }, spec -> spec
                        .requiresReply(false) // in case all messages got filtered out
        )
        .channel(s3Properties.getFileStreamingOutputChannel())
        .get();
}

我发现如果s3StreamingMessageSource.receive抛出异常,错误最终会出现在为管道中的前一个流配置的错误通道中,而不是S3_ERROR_CHANNEL为此流配置的那个。不确定它是否与这个问题有关。

4

1 回答 1

1

是从以下s3StreamingMessageSource.receive()位置调用的SourcePollingChannelAdapter

protected Message<?> receiveMessage() {
    return this.source.receive();
}

这个来自AbstractPollingEndpoint

private boolean doPoll() {

        message = this.receiveMessage();
...

        this.handleMessage(message);
...
}

这样handleMessage()做:

this.messagingTemplate.send(getOutputChannel(), message);

.enrichHeaders(spec -> spec.header(ERROR_CHANNEL, S3_ERROR_CHANNEL, true))因此,这绝对距离提到的下游还很远。

但是,您仍然可以在其中捕获异常S3_ERROR_CHANNEL。注意 的第二个参数IntegrationFlows.from()

IntegrationFlows.from(s3Properties.getFileStreamingInputChannel(),
           e -> e.poller(Pollers.fixedDelay(...)
                       .errorChannel(S3_ERROR_CHANNEL)))

或者,根据您当前的情况,您在某处有一个全局轮询器,因此在errorChannel那里配置一个。

于 2018-04-19T01:54:13.550 回答