1

我定义了两个使用这个组件的集成流。一个从 ftp 读取,一个从磁盘读取文件。

@Bean
public IntegrationFlow csvLineFlowDefinition() {
    return IntegrationFlows.from(CHANNEL_NAME)
            .filter(String.class, m -> {
                // filter to remove column definition csv line
                return !m.startsWith("ID");
            })
            .<String, MyPrettyObject>transform(csvLinePayload -> {
                String[] array = csvLinePayload.split(",");

                MyPrettyObject myPrettyObject = new MyPrettyObject();
                myPrettyObject.setId(array[0]);
                myPrettyObject.setType(array[1]);

                return myPrettyObject;
            })
            .<MyPrettyObject, String>route(myPrettyObject -> myPrettyObject.getType(),
                    routeResult -> routeResult
                            .channelMapping("AA", "AA_CHANNEL")
                            .channelMapping("BB", "BB_CHANNEL")
                            .channelMapping("CC", "CC_CHANNEL"))
            .get();
}

如果从 ftp 读取或从磁盘读取文件出现问题,我希望这两个 IntegrationFlows 失败。他们定义了自己的错误通道。我不希望在将 csv 行转换为 MyPrettyObject 以达到这两个 IntegrationFlow 时出现错误。

我曾考虑将原始 csv 行分派到消息队列,然后我可以在此消息队列的入站使用者上定义特定的错误通道。

然而,这似乎有点矫枉过正。

我试图为转换器插入一个 ExpressionEvaluatingRequestHandlerAdvice,但我不确定如何正确使用它,并且消息没有到达路由器或 ERROR_CHANNEL_NAME

@Bean
public ExpressionEvaluatingRequestHandlerAdvice csvLineTransformerAdvice() {
    ExpressionEvaluatingRequestHandlerAdvice expressionEvaluatingRequestHandlerAdvice = new ExpressionEvaluatingRequestHandlerAdvice();
    expressionEvaluatingRequestHandlerAdvice.setFailureChannelName(ERROR_CHANNEL_NAME);
    expressionEvaluatingRequestHandlerAdvice.setTrapException(true);
    return expressionEvaluatingRequestHandlerAdvice;
}

.<String, MyPrettyObject>transform(csvLinePayload -> {
                String[] array = csvLinePayload.split(",");

                MyPrettyObject myPrettyObject = new MyPrettyObject();
                myPrettyObject.setId(array[0]);
                myPrettyObject.setType(array[1]);

                return myPrettyObject;
            }, t -> t.advice(csvLineTransformerAdvice()))
4

1 回答 1

1

恐怕“阅读有问题”没有到达错误通道,因为还没有消息要处理。因此,将入站通道适配器与流的其余部分隔离可能不是一个好主意。这对于将任何下游错误传播到入站通道适配器上的错误通道来说是很正常的。

ExpressionEvaluatingRequestHandlerAdvice是正确的方法,但您应该记住,它仅适用于transformer. 下游流程尚未参与该建议。

如果出现错误,流程将停止,并且由于错误而确实无法到达下一个端点。不知道你有什么顾虑...

于 2017-06-16T13:35:08.987 回答