0

我需要拆分消息并一一处理。如果任何消息失败,我想将其报告给错误通道并继续处理下一个可用消息

我正在使用带有 1.0.0-SNAPSHOT 的 spring cloud aws 流启动器

我使用拆分器编写了一个示例程序

     @Bean
      public MessageChannel channelSplitOne() {
        return new DirectChannel();
      }


      @StreamListener(INTERNAL_CHANNEL)
      public void channelOne(String message) {
        if (message.equals("l")) {
          throw new RuntimeException("Error due to l");
        }
        System.out.println("Internal: " + message);
      }


      @Splitter(inputChannel = Sink.INPUT, outputChannel = INTERNAL_CHANNEL)
      public List<Message> extractItems(Message<String> input) {
        return Arrays.stream(input.getPayload().split(""))
            .map(s -> MessageBuilder.withPayload(s).copyHeaders(input.getHeaders()).build())
            .collect(Collectors.toList());
      }

当我发送消息为Hello时,期望'h','e','o'应该被处理,但是'l'应该被报告为错误。

但是这里在'l'之后,处理没有恢复。

有什么办法可以做到这一点。

4

1 回答 1

1

你可以这样做,但使用@ServiceActivator而不是@StreamListener. 第一个adviceChain选项可以注入ExpressionEvaluatingRequestHandlerAdvicehttps ://docs.spring.io/spring-integration/docs/5.0.4.RELEASE/reference/html/messaging-endpoints-chapter.html#expression-advice 。

拆分器的问题就像 Java 中的常规循环,所以要在出错后继续,我们需要以某种方式添加 try...catch 那里。但这已经不是拆分器的责任。因此,我们必须将这样的逻辑移动到我们有错误问题的地方。

于 2018-05-08T12:05:39.250 回答