我正在为 RabbitMQ 开发用于 M2M 解决方案的 POC。我有大量将发布数据的物理设备(目前使用 Java 客户端模拟客户端 - 最终通过 MQTT)。我想要:
- 订阅和记录所有原始数据到数据库
- 按数据类型订阅数据子集,以便我可以独立扩展这些数据类型的解决方案
- 通过交换发布新事件(例如,获取原始事件,使其更有用并通过系统重新提交)
每条消息都有一个路由键,如 key:value.key:value.key:value.messageType:1,来自设备的数据有一个额外的键 FROMDEVICE.MESSAGETYPE:1.key:value... 等。保存的订阅者来自设备的原始数据使用路由键#.FROMDEVICE.#(上面的案例#1)从交换中构建一个队列。接受特定消息类型并对其进行增值的订阅者使用路由键#.MESSAGETYPE:1.#(上面的案例#2)构建一个队列,并将新消息提交到同一个交换器,从路由键中删除 FROMDEVICE 并替换.MESSAGETYPE:1 和 .MESSAGETYPE:101(上面的案例 #3)。然后有一个新消息类型的独立订阅者/队列。
一切都很好,除了我的订阅者应该只从设备接收数据也得到增值数据(MESSAGETYPE:101),即使它应该搜索的routingKey在重新发布/增值消息中不存在。
- FROMDEVICE.MESSAGETYPE:1 ->
- 应该匹配路由键#.FROMDEVICE.#
- 应该匹配#.MESSAGETYPE:1.#
- 消息类型:101
- 应该匹配路由键#.MESSAGETYPE:101.#
- 不应匹配 #.FROMDEVICE.# (但匹配)
仅从设备订阅数据的代码:
public class HandlerWriteEverythingFromDevice {
private final static String EXCHANGE_NAME = "logsTopicDurable";
private final static String QUEUE_NAME = "fromDevice";
/**
* Writes all data from device to a data store.
*/
public static void main(String[] args) throws java.io.IOException, java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.101");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] listens for messages from devices - durable!");
channel.basicQos(1);
String routingKey = "#.fromDevice.#".toUpperCase();
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey); //bind to all selected messages
System.out.println(" [*] subscribing to: " + routingKey);
System.out.println(" [*] Waiting for messages. To exit press CTRL_C");
QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false; //ack back when done
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
int msgCount = 0;
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Message Count: " + ++msgCount + " ROUTINGKEY: '" + delivery.getEnvelope().getRoutingKey() + "\n MESSAGE: '" + message + "'");
Thread.sleep(250); //simulate some time to insert into the db.
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
仅订阅 messageType:1 并重新发布 messageType:101 的代码
private final static String EXCHANGE_NAME = "logsTopicDurable";
private final static String QUEUE_NAME = "messageType1";
/**
* Handler for messageType:1
*/
public static void main(String[] args) throws java.io.IOException, java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.101");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] listens for messageType:1 and submits messageType:101");
channel.basicQos(1);
String routingKey = "#.messageType:1.#".toUpperCase();
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey); //bind to all selected messages
System.out.println(" [*] subscribing to: " + routingKey);
System.out.println(" [*] Waiting for messages. To exit press CTRL_C");
QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false; //ack back when done
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
int msgCount = 0;
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Message Count: " + ++msgCount + " ROUTINGKEY: '" + delivery.getEnvelope().getRoutingKey() + "\n MESSAGE: '" + message + "'");
channel.basicPublish(EXCHANGE_NAME,
delivery.getEnvelope().
getRoutingKey().
replaceAll("messageType:1", "messageType:101").
replaceAll(".FROMDEVICE", "").
replaceAll("FROMDEVICE.", "").trim(),
true,
MessageProperties.PERSISTENT_BASIC,
message.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
messageType:101 有发布者代码和订阅者代码,但我认为本次讨论不需要它们。我想知道是否发布到绑定了队列的通道可能是原因,但我尝试创建两个通道(相同的连接对象)并得到相同的结果和更丑陋的代码。