3

我已经使用 paho mqttv3 编写了一个 mqtt 客户端这是客户端的代码:

public class MQTT_Client {

private MqttClient mqtt;
private MqttConnectOptions conOpt;
private static Logger log = Logger.getRootLogger();
private String topicFilter;
private int connectionTimeout = 30;

public MQTT_Client(String brokerUrl) {
    this.topicFilter = "/bmpiips/+/hb/out";
    try {
        String clientId = "HBA";
        mqtt = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
        log.info("Connecting to " + brokerUrl + " with client ID " + mqtt.getClientId());
        conOpt = new MqttConnectOptions();
        conOpt.setCleanSession(true);
        mqtt.setCallback(new MQTT_Callback(this));
    } catch (MqttException e) {
        log.error(null, e);
    }
}

public void start() {
    reconnect();
    try {
        subscribe(topicFilter, 1);
    } catch (MqttException e) {
        log.warn("Unable to subscribe to " + topicFilter);
    }
}

private void connect() throws MqttSecurityException, MqttException {
    if (mqtt.isConnected() == false) {
        mqtt.connect(conOpt);
        if (mqtt.isConnected() == true) {
            log.info("Connected to MQTT Broker.");
        }
    }
}

public boolean isConnected() {
    return mqtt.isConnected();
}

public boolean publish(String topicName, byte[] payload, int qos) {

    try {
        MqttTopic topic = mqtt.getTopic(topicName);

        MqttMessage message = new MqttMessage(payload);
        message.setQos(qos);
        log.info("Publishing to topic: \"" + topicName + "\" Message size: " + payload.length + " bytes");
        MqttDeliveryToken token = topic.publish(message);

        token.waitForCompletion();
        return true;
    } catch (MqttException e) {
        log.error(null, e);
        return false;
    }
}

public void subscribe(String topicName, int qos) throws MqttSecurityException, MqttException {
    mqtt.subscribe(topicName, qos);
    log.info("Subscribed to topic \"" + topicName + "\"");
}

public void reconnect() {
    if (mqtt.isConnected() == false) {
        try {
            connect();
        } catch (MqttSecurityException e) {
            log.error("Coud not reconnect...", e);
        } catch (MqttException e) {
            log.error("Coud not reconnect...", e);
        }
    } else {
        log.error("Allready connected...");
    }

}

public void disconnect() throws MqttException {
    mqtt.disconnect();
    while (mqtt.isConnected()) {
        // wait for disconnection
    }
    if (mqtt.isConnected() == false) {
        log.info("Disconnected from the MQTT Broker.");
    }

}

public String getTopicFilter() {
    return topicFilter;
}

public int getConnectionTimeout() {
    return connectionTimeout;
}

}

这是回调类的代码:

public class MQTT_Callback implements MqttCallback {

private static Logger log = Logger.getRootLogger();
private static MQTT_ThreadPool threadPool = new MQTT_ThreadPool();
private final MQTT_Client mqtt;


 public MQTT_Callback(MQTT_Client mqtt) {
     this.mqtt = mqtt;
 }

@Override
public void connectionLost(Throwable cause) {
    log.error("Disconnected from MqttBroker", cause);
    while (mqtt.isConnected() == false) {
        try {
            log.info("Sleeping for : " + mqtt.getConnectionTimeout()  + " seconds...");
            Thread.sleep(mqtt.getConnectionTimeout() * 1000);
        } catch (InterruptedException e) {
            log.error("Coud not sleep...");
        }
        mqtt.reconnect();
        try {
            mqtt.subscribe(mqtt.getTopicFilter(), 1);
        } catch (MqttException e) {
            log.error(e);
        }
    }

}

@Override
public void deliveryComplete(MqttDeliveryToken token) {
    log.info("Message delivered successfully");
}

@Override
public void messageArrived(MqttTopic topic, MqttMessage message) throws Exception {
    log.info("Message received from topic: " + topic.getName() + " Message size: " + message.getPayload().length
            + " bytes");
    threadPool.execute(topic, message);
}

}

它工作正常,但有时我会遇到一个奇怪的异常,它会终止连接:

2013-07-09 10:36:05.959 ERROR root:23 - Disconnected from MqttBroker
   Connection lost (32109) - java.io.EOFException
   at       org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:119)
at java.lang.Thread.run(Unknown Source)
   Caused by: java.io.EOFException
at java.io.DataInputStream.readByte(Unknown Source)
at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:51)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:86)
... 1 more

如果有人遇到过这种情况,请给我一些提示。

4

0 回答 0