2

我正在尝试学习 MQTT 并一直在玩弄它。我编写了一个用于发布的客户端和一个用于订阅的客户端(见下文)。

如果我运行订阅客户端,然后运行发布客户端(在订阅运行时),那么一切正常。我的订阅客户端正确接收发布到主题的消息。

但是,如果我先运行发布客户端(即,我向主题发布消息)然后运行订阅客户端,我将不会收到任何消息。

换句话说,如果我先连接子客户端,然后在连接子客户端时使用 pub 客户端发布消息,一切正常。但是,如果我先发布消息,然后连接到我的子客户端,我将不会收到任何消息。我的理解是,一旦我与客户端连接并订阅该主题,我应该会收到有关该主题的消息。

我发现了一个类似的问题:Cannot receive already published messages to subscribed topic on mqtt paho,尽管这种情况似乎有点不同。我尝试更改不同的 QoS 设置或 cleanSession 标志,但这并没有解决问题。

任何帮助,将不胜感激!

发布客户端:

public class MQTT_Client_Pub implements MqttCallback{

MqttClient client;

public static void main(String[] args) {

    new MQTT_Client_Pub().mqttPub();
}

public void mqttPub(){
    try {
        this.setConnection();

        // Connect
        client.connect();

        // Create new message
        MqttMessage message = new MqttMessage();
        message.setPayload("A single test message from b112358".getBytes());
        message.setQos(0);

        // Publish message to a topic
        System.out.println("Publishing a message.");
        client.publish("pahodemo/test/b112358", message);

        // Disconnect
        client.disconnect();

      } catch (MqttException e) {
        e.printStackTrace();
      } catch (Exception e){
        e.printStackTrace();
      }
}

public void setConnection(){
    // Client
    try{
        client = new MqttClient("tcp://iot.eclipse.org:1883", "mqtt_test_b112358_pub");
    } catch (MqttException e) {
        e.printStackTrace();
    }

    // Connection Options
    MqttConnectOptions options = new MqttConnectOptions();

    // Set the will
    options.setWill("pahodemo/clienterrors", "CRASHED - CONNECTION NOT CLOSED CLEANLY".getBytes(),2,true);

    // Set Callback
    client.setCallback(this);
}

public void deliveryComplete(IMqttDeliveryToken token) {
    System.out.println("Message delivered to the broker.");
}

public void messageArrived(String topic, MqttMessage message) throws Exception {}

public void connectionLost(Throwable cause) {}

}

订阅客户:

public class MQTT_Client_Sub implements MqttCallback{

MqttClient client;

public static void main(String[] args) {

    new MQTT_Client_Sub().mqttSub();

}

public void mqttSub(){
    try {
        // Set connection
        this.setConnection();

        // Connect
        client.connect();

        // Subscribe

        client.subscribe("pahodemo/test/b112358", 0);
        // Disconnect
        // client.disconnect();

      } catch (MqttException e) {
        e.printStackTrace();
      }
}

public void setConnection(){
    try {
        // Client
        client = new MqttClient("tcp://iot.eclipse.org:1883", "mqtt_test_b112358_sub");
    } catch (MqttException e) {
        e.printStackTrace();
    }

    // Connection Options
    MqttConnectOptions options = new MqttConnectOptions();
    options.setCleanSession(false);

    // Set the will
    options.setWill("pahodemo/clienterrors", "CRASHED - CONNECTION NOT CLOSED CLEANLY".getBytes(),2,true);

    client.setCallback(this);
}

public void deliveryComplete(IMqttDeliveryToken token) {}

public void messageArrived(String topic, MqttMessage message) throws Exception {
    System.out.println("Message Arrived: " + message.getPayload() + " on tipic: " + topic.getBytes());
}

public void connectionLost(Throwable cause) {}

}

4

1 回答 1

7

订阅者连接和订阅前发布的消息只会在以下2种情况下发送

  1. 当消息被发布为保留时。这意味着该主题的最后一条消息将在订阅时发送给新订阅者。这只会传递最后一条消息。

  2. 如果客户端先前已连接并订阅,则断开连接。然后发布一条消息,客户端使用 cleansession = false 再次连接。(当订阅为 QOS1/2 时)

这可能会有所帮助:http ://www.thingsprime.com/?p=2897

于 2015-01-22T18:33:12.210 回答