我是新手,正在学习这个协议。在阅读有关 MQTT 的 wiki 时。第一行“MQTT[1] 是基于发布-订阅的“轻量级”消息传递协议,用于 TCP/IP 协议之上。”
这是否意味着 MQTT 仅支持 Topic 而不支持或使用队列?
因为,即使我使用可用的客户端 API(fusesource 和 paho)进行检查。我找到了队列的客户端 API。
ActiveMQ 支持 MQTT,您可以在这里阅读更多内容:
http://activemq.apache.org/mqtt.html
最近几周,MQ Telemetry Transport 中 Message Queue Telemetry Transport 的首字母缩写词发生了变化,因为 MQTT 没有队列概念!:-)
正如@hardillb 所说,发布和订阅操作是在主题上执行的。
“队列”的概念只有在您使用高级“干净会话”来实现虚假功能时才会实现,这意味着代理保存在客户端离线的主题上发布的消息,以便当客户端返回在线时,它将收到消息。但是,此实现与代理严格相关(与队列无关)。
我写了以下免费电子书(专注于 Microsoft 技术),但第 3 章专门介绍 MQTT 协议本身。
http://www.embedded101.com/DevelopM2MIoTDevicesEbook.aspx
保罗。
是的,MQTT 只是主题,没有队列支持
MQTT 不了解 JMS 或许多其他消息传递系统中的队列。MQTT 只理解可以发布和订阅的主题。
The solution I came to using ActiveMQ 5.13.1 as the MQTT broker was to include a composite topic to which the MQTT client will send to. ActiveMQ will pick up the messages written to the topic and write them to a queue (or multiple queues or topics) as per your configuration. Keep in mind that the topic name will have the periods changed to forward slashes, so for instance, the topic name in the given example, LOCAL.EC.T
, will become LOCAL/EC/T
. This was true at least for my test setup using Eclipse Paho Client MQTTv3 1.0.2 and I believe this is an MQTT implementation detail.
${ACTIVEMQ_HOME}/conf/activemq.xml configuration.
<beans>
<broker>
...
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeTopic name="LOCAL.EC.T">
<forwardTo>
<queue physicalName="LOCAL.EC.Q.1" />
<queue physicalName="LOCAL.EC.Q.2" />
</forwardTo>
</compositeTopic>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
...
</broker>
</beans>
MQTT not supports queue if you want to implement Queue with ActiveMQ use STOMP protocol check this link https://github.com/asantos2000/RabbitMQGozirraStompAndroid
if you want to use Topic just used QUEUE_NAME= /topic/nameof_topic and if you want to use Queue used destination QUEUE_NAME=/queue/nameof_queue
for filter message in queue add selectors
Map<String,String> header=new HashMap<String, String>();
header.put("selector","(title = 'selector_name')");
con.subscribe(QUEUE_NAME, new Listener() {
@Override
public void message(Map header, String body ) {
Log.d(TAG,"onMessage()");
Log.d(TAG,"message is " + body);
Log.d(TAG,"header is " + header.toString());
message = "\n("+ counterReceive +")<-- " + body;
myHandler.post(myRunnable);
counterReceive++;
}
},header);
you will get only those message which header contain your selector name