我正在尝试使用 Spring Integration 配置以下内容:
- 向频道发送消息。
- 将此消息发布到与 n 个消费者的 rabbit fanout (pub/sub) 交换。
- 每个消费者提供一个响应消息。
- 在将这些响应返回给原始客户端之前,让 Spring Integration 聚合这些响应。
到目前为止,我有一些问题......
我正在使用发布订阅通道来设置
apply-sequence="true"
属性,以便设置correlationId、sequenceSize 和sequenceNumber 属性。这些属性被DefaultAmqpHeaderMapper
.DEBUG headerName=[correlationId] WILL NOT be mapped
sequenceSize 属性仅设置为 1,即使在扇出交换中注册了 2 个队列。大概这意味着消息会过早地从聚合器中释放。我希望这是因为我误用了发布订阅频道以便使用
apply-sequence="true"
,而且说只有一个订阅者是非常正确的,int-amqp:outbound-gateway
.
我的出站 Spring 配置如下:
<int:publish-subscribe-channel id="output" apply-sequence="true"/>
<int:channel id="reply">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
<int:aggregator input-channel="reply" method="combine">
<bean class="example.SimpleAggregator"/>
</int:aggregator>
<int:logging-channel-adapter id="logger" level="INFO"/>
<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-channel="reply"/>
<int-amqp:outbound-gateway request-channel="output"
amqp-template="amqpTemplate" exchange-name="fanout-exchange"
reply-channel="reply"/>
我的 rabbitMQ 配置如下:
<rabbit:connection-factory id="connectionFactory" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" reply-timeout="-1" />
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="a-queue"/>
<rabbit:queue name="b-queue"/>
<rabbit:fanout-exchange name="fanout-exchange">
<rabbit:bindings>
<rabbit:binding queue="a-queue" />
<rabbit:binding queue="b-queue" />
</rabbit:bindings>
</rabbit:fanout-exchange>
消费者看起来像这样:
<int:channel id="input"/>
<int-amqp:inbound-gateway request-channel="input" queue-names="a-queue" connection-factory="connectionFactory" concurrent-consumers="1"/>
<bean id="listenerService" class="example.ListenerService"/>
<int:service-activator input-channel="input" ref="listenerService" method="receiveMessage"/>
任何建议都会很棒,我怀疑我在某个地方弄错了棍子……
基于 Gary 评论的新出站 spring 配置:
<int:channel id="output"/>
<int:header-enricher input-channel="output" output-channel="output">
<int:correlation-id expression="headers['id']" />
</int:header-enricher>
<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-timeout="5000" default-reply-channel="reply" />
<int-amqp:outbound-gateway request-channel="output"
amqp-template="amqpTemplate" exchange-name="fanout-exchange"
reply-channel="reply"
mapped-reply-headers="amqp*,correlationId" mapped-request-headers="amqp*,correlationId"/>
<int:channel id="reply"/>
<int:aggregator input-channel="reply" output-channel="reply" method="combine" release-strategy-expression="size() == 2">
<bean class="example.SimpleAggregator"/>
</int:aggregator>