我是 MQTT 的新手,我有一些问题希望你们能帮助我。我正在做一个需要我使用 MQTT 协议的项目,并且程序需要用 java 编写(只是一些背景信息)
MQTT 客户端可以订阅特定的时间间隔吗?我需要使用 eclipse paho 客户端 mqttv3 阅读 mqtt 消息并订阅特定主题一段时间(例如 15 分钟)并阅读这些 mqtt 消息。请在下面找到我尝试过的代码。
private void initializeConnectionOptions() {
try {
mqttConnectOptions.setCleanSession(false);
mqttConnectOptions.setAutomaticReconnect(false);
mqttConnectOptions.setSocketFactory(SslUtil.getSocketFactory(this.caCrt, this.clientCrt, this.clientKey));
mqttConnectOptions.setKeepAliveInterval(300);
mqttConnectOptions.setConnectionTimeout(300);
mqttClient = new MqttClient("ssl://IP:port", "clientID", memoryPersistence);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String attribute = "Attribute";
JSONObject json = new JSONObject(message.toString());
LOGGER.info("json value is "+ json.toString());
if (json.toString().contains(attribute)) {
int value = json.getInt(attribute);
Long sourceTimestamp = json.getLong("sourceTimestamp");
String deviceName = json.getString("deviceName");
String deviceType = json.getString("deviceType");
if (!nodeValueWithDevice.containsKey(deviceName)) {
List<Integer> attributeValue = new ArrayList<Integer>();
if (!attributeValue.contains(value)) {
attributeValue.add(value);
}
nodeValueWithDevice.put(deviceName, attributeValue);
} else {
List<Integer> temList = nodeValueWithDevice.get(deviceName);
if (!temList.contains(value)) {
temList.add(value);
}
nodeValueWithDevice.put(deviceName, temList);
}
if (!sourceTimestampWithDevice.containsKey(deviceName)) {
List<Long> Time = new ArrayList<Long>();
if (!Time.contains(sourceTimestamp)) {
Time.add(sourceTimestamp);
}
sourceTimestampWithDevice.put(deviceName, Time);
} else {
List<Long> tempList2 = sourceTimestampWithDevice.get(deviceName);
if (!tempList2.contains(sourceTimestamp)) {
tempList2.add(sourceTimestamp);
}
sourceTimestampWithDevice.put(deviceName, tempList2);
}
LOGGER.info(" map of source time stamp is :::" + sourceTimestampWithDevice);
LOGGER.info(" map of value is :::" + nodeValueWithDevice);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
} catch (MqttException | NoSuchAlgorithmException me) {
LOGGER.error("Error while connecting to Mqtt broker. Error message {} Error code {}", me.getMessage());
}
}
public void subscription(String inputTopic) {
try {
connectToBroker();
mqttClient.subscribe(getOutputTopic(inputTopic), 1);
LOGGER.info("subscription is done::::");
} catch (Exception e) {
LOGGER.error("Error while subscribing message to broker", e.getMessage());
e.printStackTrace();
}
}