1

我有以下工作流程。

  1. 入站通道

  2. 分离器

  3. 拆分通道的任务执行器 - 所有线程执行相同的工作流。

3.a. 构造请求

3.b。网关消息端点的服务激活器包装器。

3.c。带有错误通道配置的 http-outbound-gateway 上的网关包装器(在调用 http-outbound-gateway 时处理异常)

3.d。http-出站网关

  1. 聚合器

  2. spring 集成工作流中的响应。

如果在 3.d 中发生异常,则控制转到为网关错误通道配置的服务激活器。我将以下内容从失败的消息复制到新标头到传递给错误通道的标头。

一个。相关性Id b. 序号 C. 序列大小

但是在聚合拆分器响应时,DefaultAggregatingMessageGroupProcessor.java 会删除冲突的标头,并在将控制权提供给聚合器之前删除错误通道和回复通道。

因此,一旦聚合器完成它的操作,它就无法找到回复或错误通道,并导致异常。

我正在使用 spring-integration-core 版本 2.2.1,但我不确定为什么在标头聚合期间会删除回复通道和错误通道。

任何有关解决此问题的意见都会有很大帮助。

谢谢你 :)

编辑 1: 非常感谢 Gary 帮助我解决这个问题。我正在分享我当前的配置

<!-- SPLITTER -->
<int:splitter id="dentalSplitter" ref="dentalServiceSplitter"
    method="getDentalServiceList" input-channel="dentalServiceSplitterChannel"
    output-channel="dentalSplitterTaskChannel" />

<int:channel id="dentalSplitterTaskChannel">
    <int:dispatcher task-executor="dentalTaskExecutor" />
</int:channel>

<int:chain input-channel="dentalSplitterTaskChannel" output-channel="dentalGatewayChannel">
    <int:header-enricher>
        <int:header name="CHAIN_START_TIME" expression="T(System).currentTimeMillis()" overwrite="true" />
    <int:object-to-json-transformer content-type="application/json"/>               
</int:chain>

<int:service-activator input-channel="dentalGatewayChannel" ref="dentalGatewayWrapper" output-channel="dentalReplyChannel" />
<int:gateway id="dentalGatewayWrapper" default-request-channel="dentalCostEstimateRequestChannel" error-channel="dentalErrorChannel"/>

<int-http:outbound-gateway id="dentalGateway"
    url-expression="@urlBuilder.build('${service.endpoint}')"  
    http-method="POST" request-factory="clientHttpRequestFactory"  
    request-channel="dentalCostEstimateRequestChannel" extract-request-payload="true" 
    expected-response-type="com.dental.test.DentalResponse">
    <int-http:request-handler-advice-chain> 
         <ref bean="logChainTimeInterceptor" /> 
    </int-http:request-handler-advice-chain>
</int-http:outbound-gateway>

<!-- EXCEPTION -->
<int:chain input-channel="dentalErrorChannel" output-channel="dentalAggregatorChannel">
    <int:transformer ref="commonErrorTransformer" method="dentalGracefulReturn"/>
</int:chain>

<!-- SUCCESS -->
<int:chain input-channel="dentalReplyChannel" output-channel="dentalAggregatorChannel">
        <int:filter discard-channel="dentalErrorChannel"
        expression="T(com.dental.util.InvocationOutcomeHelper).isOutcomeSuccess(payload?.metadata?.outcome?.code,payload?.metadata?.outcome?.message)" />
</int:chain>

<!-- AGGREGATION -->

<int:chain input-channel="dentalAggregatorChannel" output-channel="wsDentalServiceOutputChannel" >
    <int:aggregator ref="dentalServiceAggregator" />
    <int:service-activator ref="dentalResponseServiceActivator" />
</int:chain>

我注意到的是,通过网关时的每个拆分通道都会为错误和回复创建一个新的临时通道,并且在从网关返回响应后,它会保留保留的(原始入站)错误和回复通道标头。正如您所提到的,在控制到达错误转换器之后,保留保留的标头的流程被破坏,并且聚合消息组处理器接收到三个不同的临时通道实例,因此将它们删除。我计划拥有一个自定义消息组处理器并修改用于聚合标头的冲突解决策略并提出此配置。

<bean id="channelPreservingAggregatingMessageHandler" class="org.springframework.integration.aggregator.AggregatingMessageHandler">
    <constructor-arg name="processor" ref="channelPreservingMessageGroupProcessor"/>
</bean>

不过,我还没有对此进行测试。但是基于这个讨论,这看起来不是一个可行的解决方案。

看起来我在网关中的错误处理配置不正确。但是,我对您的这句话感到困惑“与其直接转发消息,不如简单地处理错误流中的错误并将结果正常返回给网关“包装器””。如果我删除错误通道,当异常发生时我将如何恢复控制?可能是我想在这里理解一些东西。你能详细说明一下吗?

4

1 回答 1

3

When asking questions about scenarios such as this, you generally need to show your configuration. However, I suspect you are forwarding the message from the error flow directly to the aggregator.

This is like doing a GOTO in code and breaks the scoping.

It won't work because the replyChannel header in the error message is for the gateway "wrapper", not the original upstream inbound gateway. When the aggregator gets conflicting headers, it has no choice but to drop the headers (you will see a DEBUG log message to that effect).

Instead of forwarding the message directly, simply handle the error on your error flow and return the result normally to the gateway "wrapper" (simply omit the error channel on the last element on the error flow).

The gateway will then fix up the reply so it is consistent with other messages (good and bad) and forward it to the aggregator.

You don't need to mess with headers in your error flow, just return the value you want to be aggregated along with the good results.

You should really update to a current release, or at least the latest in the 2.2.x line (2.2.6).

于 2014-09-25T05:22:02.013 回答