我有一些通过调用 DirectExchange 来调用 Spring Integration 的客户端代码,例如
Map<String, Object> result = (Map<String, Object>) rabbitTemplate.convertSendAndReceive("rpc", "KEY", map);
我可以看到调用了集成流,组装了正确的结果,然后将 AMQP 消息发送回,但从未收到该消息。我必须使用 XML 配置,并且我尝试了通道适配器和网关的各种组合,取得了不同程度的成功,但没有任何组合始终返回。
只是一个入站网关会调用流,客户端代码会看到响应,但一半的响应是输入本身:
<int-amqp:inbound-gateway auto-startup="true"
request-channel="requestChannel"
reply-channel="responseChannel"
message-converter="jacksonMessageConverter"
queue-names="rpc.KEY"/
通道适配器也调用流,但客户端代码从不使用响应,无论是否使用交换和路由表达式,尽管 AMQP 消息似乎已组装和发布。
<int-amqp:inbound-channel-adapter auto-startup="true"
channel="requestChannel2"
message-converter="jacksonMessageConverter"
mapped-request-headers="*"
queue-names="rpc.KEY"/>
<int-amqp:outbound-channel-adapter auto-startup="true"
mapped-request-headers="*"
exchange-name-expression="headers.amqp_receivedExchange"
routing-key-expression="headers.amqp_replyTo"
channel="responseChannel2"
amqp-template="rpcTemplate"/>
使用这两种方法,我缺少什么配置步骤?
编辑:完整示例上下文;使用入站网关的响应时间减半;使用通道适配器不会。无论我一个接一个地使用链或单个通道,行为都是相同的。
<bean id = "jacksonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
// to be clear, tests use gateway or channel adapters, not both at the same
// time; just included here for completeness on the example
<int-amqp:inbound-gateway auto-startup="true"
request-channel="requestChannel2"
reply-channel="responseChannel2"
message-converter="jacksonMessageConverter"
amqp-template="rpcTemplate"
queue-names="rpc.KEY"/>
<int-amqp:inbound-channel-adapter auto-startup="true"
channel="requestChannel2"
message-converter="jacksonMessageConverter"
mapped-request-headers="*"
concurrent-consumers="5"
queue-names="rpc.KEY"/>
<int-amqp:outbound-channel-adapter auto-startup="true"
mapped-request-headers="*"
channel="responseChannel2"
amqp-template="rpcTemplate"/>
<int:chain input-channel="requestChannel2" output-channel="responseChannel2">
<int:header-enricher id="jsonContentType">
<int:header name="Content-Type" value="application/json"/>
</int:header-enricher>
<int-http:outbound-gateway id="httpOutbound2"
url="http://api.icndb.com/jokes/{stem}"
http-method="GET"
expected-response-type="java.lang.String">
<int-http:uri-variable name="stem" expression="payload.stem" />
</int-http:outbound-gateway>
<int:json-to-object-transformer
type="java.util.Map"/>
<int:transformer>
<int-script:script lang="groovy">
<![CDATA[
def outputVariables = ['joke':payload.value.joke]
return outputVariables
]]>
</int-script:script>
</int:transformer>
</int:chain>
<int:channel id="requestChannel2"/>
<int:channel id="responseChannel2"/>
在测试中快速而肮脏地应用客户端代码:
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setExchange("rpc");
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setReplyTimeout((3000L));
Map<String, Object> map = new HashMap<>();
map.put("stem", "random");
int failures = 0;
for (int i=0;i<2;i++) {
Map<String, Object> result = (Map<String, Object>) rabbitTemplate.convertSendAndReceive("rpc", "YAY", map);
if (result == null || !result.containsKey("joke")) {
failures++;
}
}
assertEquals(0, failures);