我需要拆分消息并一一处理。如果任何消息失败,我想将其报告给错误通道并继续处理下一个可用消息
我正在使用带有 1.0.0-SNAPSHOT 的 spring cloud aws 流启动器
我使用拆分器编写了一个示例程序
@Bean
public MessageChannel channelSplitOne() {
return new DirectChannel();
}
@StreamListener(INTERNAL_CHANNEL)
public void channelOne(String message) {
if (message.equals("l")) {
throw new RuntimeException("Error due to l");
}
System.out.println("Internal: " + message);
}
@Splitter(inputChannel = Sink.INPUT, outputChannel = INTERNAL_CHANNEL)
public List<Message> extractItems(Message<String> input) {
return Arrays.stream(input.getPayload().split(""))
.map(s -> MessageBuilder.withPayload(s).copyHeaders(input.getHeaders()).build())
.collect(Collectors.toList());
}
当我发送消息为Hello时,期望'h','e','o'应该被处理,但是'l'应该被报告为错误。
但是这里在'l'之后,处理没有恢复。
有什么办法可以做到这一点。