1

我有两个线程,线程中的每个方法都会引发异常。如何获取每个线程中引发的所有错误?在此代码中,错误通道仅捕获其中一个错误。基本上我的目标是捕获所有错误并将它们发送给调用者(休息控制器)。任何帮助将不胜感激。谢谢。

集成.java

public IntegrationFlow provisionUserFlow() {
return IntegrationFlows.from("input.channel")
  .publishSubscribeChannel(Executors.newCachedThreadPool(),
      s -> s
            .subscribe(f -> f.handle(provisionerA, "provision"))
            .subscribe(f -> f.handle(provisionerB, "provision"))
  .get();
}

@ServiceActivator( inputChannel = "errorChannel", outputChannel = "replyChannel")

public boolean processErrors(Exception message) throws RuntimeException{

System.out.println("Message" + message.getMessage());
System.out.println ("******************************");

throw new RuntimeException(message.getMessage());
}

MGateway.java

@MessagingGateway(errorChannel = "errorChannel")
public interface MGateway {

@Gateway(requestChannel = "input.channel", replyChannel = "replyChannel") 
boolean invokeProvisioner(User user);
}

解决方案

@Bean
public IntegrationFlow provisionUserFlow() {
return
    IntegrationFlows.from("input.channel")
    .publishSubscribeChannel(Executors.newCachedThreadPool(),
        s -> s.applySequence(true)
            .subscribe(f -> f.enrichHeaders(e -> e.header(MessageHeaders.ERROR_CHANNEL, "errorChannel", true))
                .handle(provisionerA, "provision")
                .channel("aggregatorChannel")
            )
            .subscribe(f -> f.enrichHeaders(e -> e.header(MessageHeaders.ERROR_CHANNEL, "errorChannel", true))
                .handle(provisionerB, "provision")
                .channel("aggregatorChannel"))
            )
        .get();
}

@Bean
public IntegrationFlow aggregateFlow() {
return IntegrationFlows.from("aggregatorChannel")
    .channel( aggregatorChannel)
    .aggregate( a -> a.processor( collect, "aggregatingMethod"))
    .get();
}

@Transformer( inputChannel = "errorChannel", outputChannel = "aggregatorChannel")
public Message<?> errorChannelHandler(ErrorMessage errorMessage) throws RuntimeException {

Message<?> failedMessage =  ((MessagingException) errorMessage.getPayload())
    .getFailedMessage();

Exception exception = (Exception) errorMessage.getPayload();

return  MessageBuilder.withPayload( exception.getMessage())
       .copyHeadersIfAbsent( failedMessage.getHeaders() )
       .build();
}
4

1 回答 1

0

你看到@Gateway的只是Java方法。它有一个返回值,可能会抛出一个异常。我仍然很困惑为什么人们认为 Spring Integration 的工作方式有所不同。它完全基于 Java 并且没有任何魔法 - 只调用 java 方法。

现在让我们想象一下如果您只使用原始 Java 开发您会做什么。是的,您将等待来自两个线程的依赖并构建一个返回给调用者。

我们可以对 Spring Integration 做同样的事情。只需要使用AggregatorEIP。您可以在该错误通道中捕获错误消息并通过它们failedMessage的 s 关联它们。.publishSubscribeChannel()可以提供以下选项:

/**
 * Specify whether to apply the sequence number and size headers to the
 * messages prior to invoking the subscribed handlers. By default, this
 * value is <code>false</code> meaning that sequence headers will
 * <em>not</em> be applied. If planning to use an Aggregator downstream
 * with the default correlation and completion strategies, you should set
 * this flag to <code>true</code>.
 * @param applySequence true if the sequence information should be applied.
 */
public void setApplySequence(boolean applySequence) {

默认情况false下。然后,聚合器可以仅依赖默认值correlationStrategy并收集错误组以供您返回到replyChannelin 标头。

您可以在参考手册中找到的所有信息:

https://docs.spring.io/spring-integration/docs/4.3.12.RELEASE/reference/html/messaging-routing-chapter.html#aggregator

https://docs.spring.io/spring-integration/docs/4.3.12.RELEASE/reference/html/configuration.html#namespace-errorhandler

于 2017-09-29T19:06:29.910 回答