0

我正在尝试发送具有不同主题的消息,然后将消费者配置为它想要收听的主题。

我的想法是使用单个目标“domainMessage”并使用自定义分区策略。我有一个枚举,我只是使用该值作为 partitionKey,partitionStrategy 将只返回密钥(假设密钥将始终等于生产者端的分区计数)。

这行得通吗?如果是这样,我不确定如何配置消费者。

我的制片人有以下 application.properties

spring.cloud.stream.bindings.output.destination=domainMessages
spring.cloud.stream.bindings.output.producer.partition-key-extractor-class=publisher.partitionstrategy.PartitionKeyExtractorImpl
spring.cloud.stream.bindings.output.producer.partition-selector-class=publisher.partitionstrategy.PartitionSelectorStrategyImpl
spring.cloud.stream.bindings.output.producer.partition-count=3

我的 PartitionKeyExtractorImpl 看起来像

@Override
    public Object extractKey(Message<?> message) {
        DomainMessage payload = (DomainMessage) message.getPayload();
        return payload.getType();
    }

我的 PartitionStrategyImpl 看起来像

@Override
    public int selectPartition(Object key, int partitionCount) {
        return (int)key;
    }

我的消费者 application.properties 看起来像

spring.cloud.stream.bindings.input.destination=domainMessage
spring.cloud.stream.bindings.input.group=group01
spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.rabbit.bindings.input.consumer.durable-subscription=true

假设payload.getType()可以返回 1-3 之间的值。如何将使用者配置为仅收听 partitionKey 为 1 和 3 的消息?

4

1 回答 1

2

Spring Cloud Stream 对于声明队列和交换器非常固执。

spring.cloud.stream.bindings.input.destination=domainMessage
spring.cloud.stream.bindings.input.group=group01
spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.rabbit.bindings.input.consumer.durable-subscription=true
spring.cloud.stream.bindings.input.consumer.instance-index=0

(注意实例索引)

这将与路由键绑定domainMessage.group01-0交换。domainMessagedomainMessage-0

如果您希望在单个实例中使用多个分区,只需执行

spring.cloud.stream.bindings.input.destination=domainMessage-0,domainMessage-1

(并删除instance-indexand partitioned=true)将不起作用,因为它将每个队列绑定到相应的交换(例如domainMessage-0使用路由键#)。

一种解决方案是这样做,但手动添加一个交换到交换绑定,以使用适当的路由密钥将每个消费者交换 ( domainMessage-n) 绑定到上游交换 ( )。domainMessage

目前还没有办法自动从单个instance-index.

于 2016-09-13T20:04:23.163 回答