4

我尝试使用 TopicExchange 来屏蔽消息。

配置:

    <rabbit:connection-factory id="connectionFactory"  host="localhost" username="guest" password="guest"/>

<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

<rabbit:queue name="sample.queue"/>

<rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory" />

<bean id="rabbitListenerContainerFactory"
      class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>

<rabbit:annotation-driven container-factory="rabbitListenerContainerFactory"/>

<rabbit:listener-container connection-factory="connectionFactory" />

零件:

@Component
public class JmsComponent {

    private final Logger log = LoggerFactory.getLogger(JmsComponent.class);

    private final TopicExchange exchange = new TopicExchange("sample.exchange");

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private Queue queue;

    private String received;

    public void send(String msg) {
        rabbitTemplate.convertAndSend("sample.queue", new SimpleMessage(msg));
    }

    public void bindToKey(String keyMask) {
        BindingBuilder.bind(queue).to(exchange).with(keyMask);
        rabbitTemplate.setExchange(exchange.getName());
    }


    public void sendByKey(String key, String msg) {
        rabbitTemplate.convertAndSend(exchange.getName(), key, new SimpleMessage(msg));
    }

    @RabbitListener(queues = "sample.queue")
    public void handle(SimpleMessage message) {
        log.info("================ Received  " + message.getMsg());
        received = message.getMsg();
    }

    public String getReceived() {
        return received;
    }

当我使用发送(包括 TopicExchange 之前)时 - 一切正常。消息被直接发送到队列,handle() 已经接收到它。但是对于 TopicExchange.... 我尝试使用它:

@Test
public void bind() throws InterruptedException {
    jmsComponent.bindToKey("qq");
    jmsComponent.sendByKey("qq", "message");
    Thread.sleep(5000);
    Assert.isTrue("message".equals(jmsComponent.getReceived()));
}

测试总是失败,但在日志中我看到了这个 - DEBUG osamqp.rabbit.core.RabbitTemplate - Publishing message on exchange [sample.exchange], routingKey = [qq] 怎么了???谢谢

4

2 回答 2

5

这个...

BindingBuilder.bind(queue).to(exchange).with(keyMask);

Binding...除了创建一个对象然后将其丢弃之外什么都不做。您需要获取该Binding对象并致电declareBinding管理员。您还需要申报交换。

由于您的上下文中有管理员;最简单的方法是将<rabbit:exchange/>(连同绑定)添加到上下文中。请参阅文档

<rabbit:queue id="myQueue" name="sample.queue"/>

<topic-exchange name="sample.exchange">
    <bindings>
        <binding queue="myQueue" pattern="bucket.#"/>
    </bindings>
</topic-exchange>

顺便说一句,主题交换旨在通过关键模式进行路由;如果您只是想使用固定密钥进行路由/绑定,例如qq使用直接交换。请参阅RabbitMQ 教程

于 2016-02-11T13:50:52.320 回答
3

我更改了 Gary Russell 的组件使用答案:

  • 我加了

    @Autowired
    private RabbitAdmin rabbitAdmin;
    
    @PostConstruct
    public void init(){
        rabbitAdmin.declareExchange(exchange);
    }
    
  • 并修改绑定方法:

    public void bindToKey(String keyMask) {
        Binding binding = BindingBuilder.bind(queue).to(exchange).with(keyMask);
        rabbitAdmin.declareBinding(binding); // re-declare binding if mask changed
        rabbitTemplate.setExchange(exchange.getName());
    }
    
  • 并且测试在它之后变成了作品!

此外,我在运行时添加了更改绑定掩码:

@Test
public void bind() throws InterruptedException {
    jmsComponent.bindToKey("qq");
    jmsComponent.sendByKey("qq", "message");
    Thread.sleep(5000);
    Assert.isTrue("message".equals(jmsComponent.getReceived()));

    jmsComponent.bindToKey("eeeee");
    jmsComponent.sendByKey("eeeee", "message one");
    Thread.sleep(5000);
    Assert.isTrue("message one".equals(jmsComponent.getReceived()));
}

所有作品。

于 2016-02-11T14:47:49.347 回答