我创建了一个示例 HttpSource 和 HttpSink。我的 application.properties 看起来像这样
Source
spring.cloud.stream.bindings.output.destination=greetings
spring.cloud.stream.bindings.output.partitionKeyExpression=payload
spring.cloud.stream.bindings.output.partitionCount=2
Sink
spring.cloud.stream.bindings.input.destination=greetings
spring.cloud.stream.bindings.input.partitioned=true
spring.cloud.stream.instanceCount=2
spring.cloud.stream.instanceIndex=0
HttpSource
@RestController
@EnableBinding(Source.class)
public class SampleSource {
@Autowired
private MessageChannel output;
@RequestMapping(path="/message",method=RequestMethod.POST)
public void sendMessage(@RequestBody String name){
output.send(MessageBuilder.withPayload("Hello, "+name).build());
}
}
HttpSink
@EnableBinding(Sink.class)
public class SampleSink {
@ServiceActivator(inputChannel=Sink.INPUT)
public void sendMessage(String name){
System.out.println(name);
}
}
我将这两个应用程序都部署到了 Pivotal Cloud Foundry。HttpSource 有一个端点,它在调用时将消息发送到名为“greetings”的主题交换。然后我将 HttpSink 缩放为有 2 个实例。这创建了两个队列并绑定到“问候”交换。
现在,当我到达端点时,我发现消息已发送到两个队列。我知道这一点是因为我跟踪了日志并发现该消息被打印了两次。
如何使消息仅发送到其中一个队列?
编辑:
我没有在 Pivotal Cloud Foundry 中扩展 HttpSink,而是将 HttpSink 部署为两个不同的应用程序。但在 application.properties 中它们属于同一组。其中一个的instanceIndex=0,另一个instanceIndex=1。
即使是现在,我也得到一个绑定为“#”的队列和两个消费者到该队列。
如何让不同的 HttpSink 实例创建自己的队列,并根据 partitionKey 将来自 HttpSource 的消息路由到其中一个?