0

我正在为 RabbitMQ 开发用于 M2M 解决方案的 POC。我有大量将发布数据的物理设备(目​​前使用 Java 客户端模拟客户端 - 最终通过 MQTT)。我想要:

  1. 订阅和记录所有原始数据到数据库
  2. 按数据类型订阅数据子集,以便我可以独立扩展这些数据类型的解决方案
  3. 通过交换发布新事件(例如,获取原始事件,使其更有用并通过系统重新提交)

每条消息都有一个路由键,如 key:value.key:value.key:value.messageType:1,来自设备的数据有一个额外的键 FROMDEVICE.MESSAGETYPE:1.key:value... 等。保存的订阅者来自设备的原始数据使用路由键#.FROMDEVICE.#(上面的案例#1)从交换中构建一个队列。接受特定消息类型并对其进行增值的订阅者使用路由键#.MESSAGETYPE:1.#(上面的案例#2)构建一个队列,并将新消息提交到同一个交换器,从路由键中删除 FROMDEVICE 并替换.MESSAGETYPE:1 和 .MESSAGETYPE:101(上面的案例 #3)。然后有一个新消息类型的独立订阅者/队列。

一切都很好,除了我的订阅者应该只从设备接收数据也得到增值数据(MESSAGETYPE:101),即使它应该搜索的routingKey在重新发布/增值消息中不存在。

  • FROMDEVICE.MESSAGETYPE:1 ->
    • 应该匹配路由键#.FROMDEVICE.#
    • 应该匹配#.MESSAGETYPE:1.#
  • 消息类型:101
    • 应该匹配路由键#.MESSAGETYPE:101.#
    • 不应匹配 #.FROMDEVICE.# (但匹配)

仅从设备订阅数据的代码:

public class HandlerWriteEverythingFromDevice {

private final static String EXCHANGE_NAME = "logsTopicDurable";
private final static String QUEUE_NAME = "fromDevice";
/**
 * Writes all data from device to a data store.
 */
public static void main(String[] args) throws java.io.IOException, java.lang.InterruptedException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.56.101");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);

    System.out.println(" [*] listens for messages from devices - durable!");

    channel.basicQos(1);

    String routingKey = "#.fromDevice.#".toUpperCase();

    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey); //bind to all selected messages
    System.out.println(" [*] subscribing to: " + routingKey);

    System.out.println(" [*] Waiting for messages. To exit press CTRL_C");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    boolean autoAck = false; //ack back when done
    channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    int msgCount = 0;
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());

        System.out.println(" [x] Message Count: " + ++msgCount + " ROUTINGKEY: '" + delivery.getEnvelope().getRoutingKey() + "\n     MESSAGE: '" + message + "'");
        Thread.sleep(250); //simulate some time to insert into the db.
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}
}

仅订阅 messageType:1 并重新发布 messageType:101 的代码

    private final static String EXCHANGE_NAME = "logsTopicDurable";
private final static String QUEUE_NAME = "messageType1";
/**
 * Handler for messageType:1
 */
public static void main(String[] args) throws java.io.IOException, java.lang.InterruptedException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.56.101");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);

    System.out.println(" [*] listens for messageType:1 and submits messageType:101");

    channel.basicQos(1);

    String routingKey = "#.messageType:1.#".toUpperCase();

    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey); //bind to all selected messages
    System.out.println(" [*] subscribing to: " + routingKey);

    System.out.println(" [*] Waiting for messages. To exit press CTRL_C");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    boolean autoAck = false; //ack back when done
    channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    int msgCount = 0;
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());

        System.out.println(" [x] Message Count: " + ++msgCount + " ROUTINGKEY: '" + delivery.getEnvelope().getRoutingKey() + "\n     MESSAGE: '" + message + "'");

        channel.basicPublish(EXCHANGE_NAME, 
                delivery.getEnvelope().
                        getRoutingKey().
                        replaceAll("messageType:1", "messageType:101").
                        replaceAll(".FROMDEVICE", ""). 
                        replaceAll("FROMDEVICE.", "").trim(), 
                true, 
                MessageProperties.PERSISTENT_BASIC, 
                message.getBytes());

        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}

messageType:101 有发布者代码和订阅者代码,但我认为本次讨论不需要它们。我想知道是否发布到绑定了队列的通道可能是原因,但我尝试创建两个通道(相同的连接对象)并得到相同的结果和更丑陋的代码。

4

1 回答 1

1

我建议您对绑定键有点宽松。为了让事情更清楚一点,您应该以不同的方式使用术语绑定键和路由键。路由键是生产者发送的。绑定键用于将队列绑定到主题交换。

因为我不确定你说的是哪个

“应该匹配路由键#.MESSAGETYPE:101.#”

您是否使用路由密钥发送消息,#.MESSAGETYPE:101.#因为那是个坏主意。我想不是,但如果你不是!

让我们假设这是您的绑定密钥。我不确定,因为我没有专门对此进行任何测试,但#之前和之后可能会导致一些问题。您应该考虑路由键的规范。他们必须遵守的某种格式。它可能是可扩展的,但不是完全免费的。这样,您可以使用更具体的绑定键,*而不是#提供更多控制权。

于 2013-03-19T08:51:56.217 回答