我已经使用 paho mqttv3 编写了一个 mqtt 客户端这是客户端的代码:
public class MQTT_Client {
private MqttClient mqtt;
private MqttConnectOptions conOpt;
private static Logger log = Logger.getRootLogger();
private String topicFilter;
private int connectionTimeout = 30;
public MQTT_Client(String brokerUrl) {
this.topicFilter = "/bmpiips/+/hb/out";
try {
String clientId = "HBA";
mqtt = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
log.info("Connecting to " + brokerUrl + " with client ID " + mqtt.getClientId());
conOpt = new MqttConnectOptions();
conOpt.setCleanSession(true);
mqtt.setCallback(new MQTT_Callback(this));
} catch (MqttException e) {
log.error(null, e);
}
}
public void start() {
reconnect();
try {
subscribe(topicFilter, 1);
} catch (MqttException e) {
log.warn("Unable to subscribe to " + topicFilter);
}
}
private void connect() throws MqttSecurityException, MqttException {
if (mqtt.isConnected() == false) {
mqtt.connect(conOpt);
if (mqtt.isConnected() == true) {
log.info("Connected to MQTT Broker.");
}
}
}
public boolean isConnected() {
return mqtt.isConnected();
}
public boolean publish(String topicName, byte[] payload, int qos) {
try {
MqttTopic topic = mqtt.getTopic(topicName);
MqttMessage message = new MqttMessage(payload);
message.setQos(qos);
log.info("Publishing to topic: \"" + topicName + "\" Message size: " + payload.length + " bytes");
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
return true;
} catch (MqttException e) {
log.error(null, e);
return false;
}
}
public void subscribe(String topicName, int qos) throws MqttSecurityException, MqttException {
mqtt.subscribe(topicName, qos);
log.info("Subscribed to topic \"" + topicName + "\"");
}
public void reconnect() {
if (mqtt.isConnected() == false) {
try {
connect();
} catch (MqttSecurityException e) {
log.error("Coud not reconnect...", e);
} catch (MqttException e) {
log.error("Coud not reconnect...", e);
}
} else {
log.error("Allready connected...");
}
}
public void disconnect() throws MqttException {
mqtt.disconnect();
while (mqtt.isConnected()) {
// wait for disconnection
}
if (mqtt.isConnected() == false) {
log.info("Disconnected from the MQTT Broker.");
}
}
public String getTopicFilter() {
return topicFilter;
}
public int getConnectionTimeout() {
return connectionTimeout;
}
}
这是回调类的代码:
public class MQTT_Callback implements MqttCallback {
private static Logger log = Logger.getRootLogger();
private static MQTT_ThreadPool threadPool = new MQTT_ThreadPool();
private final MQTT_Client mqtt;
public MQTT_Callback(MQTT_Client mqtt) {
this.mqtt = mqtt;
}
@Override
public void connectionLost(Throwable cause) {
log.error("Disconnected from MqttBroker", cause);
while (mqtt.isConnected() == false) {
try {
log.info("Sleeping for : " + mqtt.getConnectionTimeout() + " seconds...");
Thread.sleep(mqtt.getConnectionTimeout() * 1000);
} catch (InterruptedException e) {
log.error("Coud not sleep...");
}
mqtt.reconnect();
try {
mqtt.subscribe(mqtt.getTopicFilter(), 1);
} catch (MqttException e) {
log.error(e);
}
}
}
@Override
public void deliveryComplete(MqttDeliveryToken token) {
log.info("Message delivered successfully");
}
@Override
public void messageArrived(MqttTopic topic, MqttMessage message) throws Exception {
log.info("Message received from topic: " + topic.getName() + " Message size: " + message.getPayload().length
+ " bytes");
threadPool.execute(topic, message);
}
}
它工作正常,但有时我会遇到一个奇怪的异常,它会终止连接:
2013-07-09 10:36:05.959 ERROR root:23 - Disconnected from MqttBroker
Connection lost (32109) - java.io.EOFException
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:119)
at java.lang.Thread.run(Unknown Source)
Caused by: java.io.EOFException
at java.io.DataInputStream.readByte(Unknown Source)
at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:51)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:86)
... 1 more
如果有人遇到过这种情况,请给我一些提示。