我正在使用 Spring Integration 使用 DSL 来处理 JMS 和 REST 服务之间的通信。要求是消息应该无限期地重新传递。在一种情况下,我必须顺序执行两个操作。如果第一个失败,则不应执行第二个,但如果是 4xx 错误,我不应该尝试重新传递它。我的代码如下所示:
IntegrationFlows.from(Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destinationn)).get())
.publishSubscribeChannel(c -> c
.subscribe(firstRestOperation ->
firstRestOperation
.transform(originalMessageToFirstRequestTransformer())
.handle(Http.outboundGateway(restApiBaseUri + "/first-endpoint", restTemplate)
.httpMethod(HttpMethod.POST).get()) //when this handler receives HTTP Status 4xx,
//second operation shouldn't be executed and
//and message shouldn't be redelievered
.subscribe(secondRestOperation->
secondRestOperation
.transform(originalMessageToSecondRequestTransformer())
.handle(Http.outboundGateway(restApiBaseUri + "/second-endpoint", restTemplate).httpMethod(HttpMethod.POST).get())
).get();
class MyErrorHandler extends DefaultResponseErrorHandler { //this is used in Option B
@Override
public void handleError(ClientHttpResponse response) throws IOException {
if(response.getStatusCode().is4xxClientError()){
log.warn(...);
}else{
super.handleError(response);
}
}
}
@Bean
public RestTemplate restTemplate() {
RestTemplate restTemplate = new RestTemplate();
restTemplate.setErrorHandler(myErrorHandler); //this is used in Option B
return restTemplate;
}
我怎样才能满足这些要求?我唯一的想法是在提交 JMS 会话时以某种方式中断 IntegrationFlow。
感谢您的任何建议。
更新
选项 A: 目前:
- 操作 1 失败并出现任何错误
- 操作 2 未执行
- 消息被不确定地重新传递
选项 B:我也可以处理 4xx 错误,然后:
- 操作 1 以 4xx 失败,已处理异常
- 执行操作 2
- 集成流程正常完成,提交 JMS 会话并且未重新传递消息
但这会导致执行操作 2
我需要的是:
操作 1 以 4xx 失败
操作 2未执行
消息未重新发送
更新 2
我想我可能会有所进展。正如@gary-russel 建议的那样,我添加了错误通道,并处理了 4xx 错误:
@Bean
public MessageProducerSupport inputUpsertCustomerMessageProducerSupport() {
return Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destination).messageSelector(jmsSelector)).errorChannel(errorHandlingChannel).get();
}
@Bean
public PublishSubscribeChannel errorHandlingChannel() {
return MessageChannels.publishSubscribe().get();
}
@Bean
public ErrorMessageExceptionTypeRouter errorMessageExceptionTypeRouter() {
ErrorMessageExceptionTypeRouter router = new ErrorMessageExceptionTypeRouter();
router.setChannelMapping(HttpClientErrorException.class.getName(), "clientErrorMessageChannel");
router.setDefaultOutputChannel(unhandledErrorsChannel());
return router;
}
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlows.from(customErrorChannel())
.log()
.route(errorMessageExceptionTypeRouter())
.get();
}
@Bean
public MessageChannel clientErrorMessageChannel(){
return MessageChannels
.direct()
.get();
}
@Bean
public IntegrationFlow clientErrorFlow() {
return IntegrationFlows.from(clientErrorMessageChannel())
.handle(message -> log.warn(...) //handle error here
.get();
}
@Bean
public MessageChannel unhandledErrorsChannel(){
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow unhandledErrorsFlow(){
//how should I implement it?
}
我只想处理 4xx 错误,其余的应该被传播并应该导致 JMS 消息重新传递。我尝试不设置defaultOutputChannel
(ErrorMessageExceptionTypeRouter
而不是引发另一个异常)或设置defaultOutputChannel
为默认值errorChannel
(而不是处理所有错误)。
更新 3
在以下位置找到解决方案:Spring Integration Java DSL using JMS retry/redlivery
这是我的错误处理流程的代码:
@Bean
public MessageProducerSupport inputUpsertCustomerMessageProducerSupport() {
return Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destination).messageSelector(jmsSelector)).errorChannel(customErrorChannel()).get();
}
@Bean
public PublishSubscribeChannel customErrorChannel() {
return MessageChannels.publishSubscribe().get();
}
@Bean
public ErrorMessageExceptionTypeRouter errorMessageExceptionTypeRouter() {
ErrorMessageExceptionTypeRouter router = new ErrorMessageExceptionTypeRouter();
router.setChannelMapping(HttpClientErrorException.class.getName(), "clientErrorMessageChannel");
router.setDefaultOutputChannel(unhandledErrorsChannel());
return router;
}
@Bean
public MessageChannel clientErrorMessageChannel(){
return MessageChannels
.direct()
.get();
}
@Bean
public MessageChannel unhandledErrorsChannel(){
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow unhandledErrorsFlow(){
return IntegrationFlows.from(unhandledErrorsChannel()).handle("thisBeanName", "handleError").get();
}
public void handleError(Throwable t) throws Throwable {
log.warn("Received unhandled exception");
throw t;
}
@Bean
public IntegrationFlow clientErrorFlow() {
return IntegrationFlows.from(clientErrorMessageChannel())
.handle(message -> log.warn("Received HTTP Status 4xx with message: " + ((MessageHandlingException)message.getPayload()).getCause().getMessage()))
.get();
}
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlows.from(customErrorChannel())
.log()
.route(errorMessageExceptionTypeRouter())
.get();
}
因此解决方案是将异常重定向到将通过重新抛出它们来处理它们的流。太糟糕BaseIntegrationFlow
了,没有接受和抛出的方法Throwable
- 现在只能通过指定要调用的 bean 和方法名称来实现。