1

我正在尝试添加过滤器以丢弃流并在失败后继续执行主流并聚合拆分器。错误和成功的预期类型相同。没有特定的聚合器逻辑。

@Bean
public IntegrationFlow flow() {
     return f -> f
         .split(Orders.class, Orders::getItems)
         .enrich(e -> e.requestChannel("enrichChannel"))
         .filter(Order.class, c -> c.getId() > 10 ? true : false,
             e -> e.discardChannel(validationError()))
         .handle(new MyHandler())
         .transform(new MapToObjectTransformer(Order.class))
         .enrich(e -> e.requestChannel("transformChannel"))
         .filter(Order.class, c -> c.getTotal() > 100 ? true : false,
             e -> e.discardChannel(validationError())).handle( new transformer())
         .aggregate();
 }
            
@Bean
public IntegrationFlow validationErrorFlow() {
 return IntegrationFlows.from(validationError())
         .handle(new ValidationHandler())
         .get();
}

丢弃通道没有加入回主流以执行拆分中的下一个项目。

我可以编写路由和子流映射,但这会变得过于嵌套在路由 -> 子流 -> 路由 -> 子流中,试图通过使用过滤器来解决这个问题。是否有更好的方法来执行验证并仍然继续拆分流程中的所有项目。

更新1:

.handle(request.class, (p, h) -> validator.validate(p)
.gateway("filterFlow.input")
.handle(new MyHandler())
.enrich(...)
.handle(...)
.enrich(...)
.handle(...)
.enrich(...)
.handle(...)
.aggregate();



@Bean
    public IntegrationFlow filterFlow() {
        return f -> f
                .filter(response.class, c -> c.isValidationStatus(), df -> df.discardFlow
                        (flow -> flow.handle(Message.class, (p, h) -> p.getPayload())));
    }

网关能够拦截请求,但执行的流程.handle(new MyHandler())而不是下一个项目split()

更新 2:(答案)来自 Artem

.handle(request.class, (p, h) -> validator.validate(p))
    .filter(response.class,p -> p.isValidationStatus(), f -> f.discardChannel("aggregatorChannel"))
    .handle(new MyHandler())
    .enrich(...)
    .handle(...)
    .enrich(...)
    .handle(...)
    .enrich(...)
    .handle(...)
    .channel("aggregatorChannel")
    .aggregate();

这将有条件地跳过并继续流程。

4

1 回答 1

0

丢弃通道没有加入回主流以执行拆分中的下一个项目。

确实如此。这就是它的设计方式。在大多数情况下,丢弃流程类似于 JMS 中的死信队列。所以,这是短的单向分支。

如果你真的想回到主流,你应该考虑在流定义中使用命名通道。我的意思是在您想在补偿(丢弃)流程之后回来的地方:

.filter(Order.class, c -> c.getId() > 10 ? true : false,
                    e -> e.discardFlow(sf -> sf
                            .gateway(validationError())
                            .channel("myHandleChannel")))
            .channel("myHandleChannel")
            .handle(new MyHandler())

我使用gateway()是因为我们需要丢弃流的回复才能继续处理。我们需要.channel("myHandleChannel")在子流结束时使用它,因为丢弃流是一个分支。

另一种方法可以通过.gateway()on 主流程实现:

.gateway("filterFlow.input")
.handle(new MyHandler())

...

@Bean
public IntegrationFlow filterFlow() {
    return f -> f
            .filter(Order.class, c -> c.getId() > 10 ? true : false,
                    e -> e.discardChannel(validationError()));
}

我们发送到discardChannel相同的请求消息,因此replyChannel上述网关的正确标头仍然存在。您只需要确保从.handle(new ValidationHandler()).

于 2018-02-21T14:56:25.160 回答