0

我创建了一个示例 HttpSource 和 HttpSink。我的 application.properties 看起来像这样

Source
spring.cloud.stream.bindings.output.destination=greetings
spring.cloud.stream.bindings.output.partitionKeyExpression=payload
spring.cloud.stream.bindings.output.partitionCount=2


Sink
spring.cloud.stream.bindings.input.destination=greetings
spring.cloud.stream.bindings.input.partitioned=true
spring.cloud.stream.instanceCount=2
spring.cloud.stream.instanceIndex=0

HttpSource

@RestController
@EnableBinding(Source.class)
 public class SampleSource {

 @Autowired
 private MessageChannel output;

 @RequestMapping(path="/message",method=RequestMethod.POST)
 public void sendMessage(@RequestBody String name){
    output.send(MessageBuilder.withPayload("Hello, "+name).build());
 }
}

HttpSink

@EnableBinding(Sink.class)
public class SampleSink {

 @ServiceActivator(inputChannel=Sink.INPUT)
 public void sendMessage(String name){
    System.out.println(name);
 }
}

我将这两个应用程序都部署到了 Pivotal Cloud Foundry。HttpSource 有一个端点,它在调用时将消息发送到名为“greetings”的主题交换。然后我将 HttpSink 缩放为有 2 个实例。这创建了两个队列并绑定到“问候”交换。

现在,当我到达端点时,我发现消息已发送到两个队列。我知道这一点是因为我跟踪了日志并发现该消息被打印了两次。

如何使消息仅发送到其中一个队列?

编辑:

我没有在 Pivotal Cloud Foundry 中扩展 HttpSink,而是将 HttpSink 部署为两个不同的应用程序。但在 application.properties 中它们属于同一组。其中一个的instanceIndex=0,另一个instanceIndex=1。

即使是现在,我也得到一个绑定为“#”的队列和两个消费者到该队列。

如何让不同的 HttpSink 实例创建自己的队列,并根据 partitionKey 将来自 HttpSource 的消息路由到其中一个?

4

1 回答 1

0

我之前没有注意到 - 您consumer.在物业中失踪了:

spring.cloud.stream.bindings.input.consumer.partitioned=true

请参阅文档

您是否使用 IDE 创建属性文件?当我复制您的属性时,我的 STS (3.8.1) 版本将此标记为问题。

我刚刚运行了一个测试,队列被正确命名并使用正确的密钥绑定到交换。

编辑

为了让它在 PCF 上正确扩展,我还必须注释掉该instanceIndex属性(可能是因为它覆盖了 PCF 环境属性),否则我有 2 个消费者在-0队列中。删除该属性后,我得到了预期的 2 个队列。

于 2016-08-17T13:15:23.423 回答