0

我已经编写了监听 Kinesis 流的 Spring Cloud Stream 应用程序

跨渠道的消息流

Kinesis Input --> Transformer --> Router (route to error or actaul processor)

我计划编写跨这些渠道的错误处理。目前我明确地捕捉到错误并发送到不同的渠道。我不想捕获和修改有效负载,而是想实现通用异常处理程序

粘合剂

    public interface Binder {

      @Input(ITEM_KINESIS_INPUT)
      SubscribableChannel itemMessage();

    }

配置

    @Configuration
    @EnableIntegration
    public class IntegrationConfiguration {

      @Bean(name = DEFAULT_ITEM_LISTENER)
      public MessageChannel defaultItemChannel() {
        return new DirectChannel();
      }

      @Bean(name = DEFAULT_ERROR)
      public MessageChannel DEFAULT_ERROR() {
        return new DirectChannel();
      }

      @Bean
      @Transformer(inputChannel = ITEM_KINESIS_INPUT, outputChannel = ITEM_TRANSFORMER)
      public ItemTransformer itemTransformer() {
        return new ItemTransformer();
      }


      @ServiceActivator(inputChannel = ITEM_TRANSFORMER)
      @Bean
      public ItemMessageTypeRouter router() {
        return new ItemMessageTypeRouter();
      }

    // I tried this as well Not working
      @ServiceActivator(inputChannel = "errorChannel")
      @Bean
      public AbstractMessageHandler process() {
        return new AbstractMessageHandler() {
          @Override
          protected void handleMessageInternal(Message<?> message1) throws Exception {
            System.out.println("MessageHandler SA");
          }
        };

      }

    }

变压器

    public class ItemTransformer extends AbstractTransformer {


      @Override
      public String getComponentType() {
        return "transformer";
      }



      @Override
      protected Object doTransform(Message<?> message) throws Exception {
        Object transformedPayload = null;
        try {
          //transform
        } catch (Exception e) {
          transformedPayload =
              new MessageTransformationException(message, "failed to transform message", e);
        }
        return MessageBuilder.withPayload(transformedPayload).copyHeaders(message.getHeaders()).build();
      }
    }

并在路由器路由到不同的通道错误

public class ItemMessageTypeRouter extends AbstractMappingMessageRouter {


  @Override
  protected List<Object> getChannelKeys(Message<?> message) {
    return (message.getPayload() instanceof Throwable) ? Arrays.asList(DEFAULT_ERROR)
        : Arrays.asList(DEFAULT_ITEM_LISTENER)
  } 
}

错误处理

@EnableBinding(Binder.class)
public class ErrorListener implements MessageListener<Message> {

  @Autowired
  private MessagesBackupRepository backupRepository;

  @Override
  @StreamListener(DEFAULT_ERROR)
  public void receiveMessage(Message message) {
    log.error("Error has been reported " + message);
  }

}

我们能否实现所有这些通道的通用异常处理程序。

目前我只处理Transformer中的错误,但是如果Router或以后的通道出现异常,那么这也需要手动处理

任何有助于更好地处理错误的见解

4

0 回答 0