我已经编写了监听 Kinesis 流的 Spring Cloud Stream 应用程序
跨渠道的消息流
Kinesis Input --> Transformer --> Router (route to error or actaul processor)
我计划编写跨这些渠道的错误处理。目前我明确地捕捉到错误并发送到不同的渠道。我不想捕获和修改有效负载,而是想实现通用异常处理程序
粘合剂
public interface Binder {
@Input(ITEM_KINESIS_INPUT)
SubscribableChannel itemMessage();
}
配置
@Configuration
@EnableIntegration
public class IntegrationConfiguration {
@Bean(name = DEFAULT_ITEM_LISTENER)
public MessageChannel defaultItemChannel() {
return new DirectChannel();
}
@Bean(name = DEFAULT_ERROR)
public MessageChannel DEFAULT_ERROR() {
return new DirectChannel();
}
@Bean
@Transformer(inputChannel = ITEM_KINESIS_INPUT, outputChannel = ITEM_TRANSFORMER)
public ItemTransformer itemTransformer() {
return new ItemTransformer();
}
@ServiceActivator(inputChannel = ITEM_TRANSFORMER)
@Bean
public ItemMessageTypeRouter router() {
return new ItemMessageTypeRouter();
}
// I tried this as well Not working
@ServiceActivator(inputChannel = "errorChannel")
@Bean
public AbstractMessageHandler process() {
return new AbstractMessageHandler() {
@Override
protected void handleMessageInternal(Message<?> message1) throws Exception {
System.out.println("MessageHandler SA");
}
};
}
}
变压器
public class ItemTransformer extends AbstractTransformer {
@Override
public String getComponentType() {
return "transformer";
}
@Override
protected Object doTransform(Message<?> message) throws Exception {
Object transformedPayload = null;
try {
//transform
} catch (Exception e) {
transformedPayload =
new MessageTransformationException(message, "failed to transform message", e);
}
return MessageBuilder.withPayload(transformedPayload).copyHeaders(message.getHeaders()).build();
}
}
并在路由器路由到不同的通道错误
public class ItemMessageTypeRouter extends AbstractMappingMessageRouter {
@Override
protected List<Object> getChannelKeys(Message<?> message) {
return (message.getPayload() instanceof Throwable) ? Arrays.asList(DEFAULT_ERROR)
: Arrays.asList(DEFAULT_ITEM_LISTENER)
}
}
错误处理
@EnableBinding(Binder.class)
public class ErrorListener implements MessageListener<Message> {
@Autowired
private MessagesBackupRepository backupRepository;
@Override
@StreamListener(DEFAULT_ERROR)
public void receiveMessage(Message message) {
log.error("Error has been reported " + message);
}
}
我们能否实现所有这些通道的通用异常处理程序。
目前我只处理Transformer中的错误,但是如果Router或以后的通道出现异常,那么这也需要手动处理
任何有助于更好地处理错误的见解