1

我是 MQTT 的新手,我有一些问题希望你们能帮助我。我正在做一个需要我使用 MQTT 协议的项目,并且程序需要用 java 编写(只是一些背景信息)

MQTT 客户端可以订阅特定的时间间隔吗?我需要使用 eclipse paho 客户端 mqttv3 阅读 mqtt 消息并订阅特定主题一段时间(例如 15 分钟)并阅读这些 mqtt 消息。请在下面找到我尝试过的代码。

private void initializeConnectionOptions() {
    try {
        mqttConnectOptions.setCleanSession(false);
        mqttConnectOptions.setAutomaticReconnect(false);
        mqttConnectOptions.setSocketFactory(SslUtil.getSocketFactory(this.caCrt, this.clientCrt, this.clientKey));
        mqttConnectOptions.setKeepAliveInterval(300);
        mqttConnectOptions.setConnectionTimeout(300);
        mqttClient = new MqttClient("ssl://IP:port", "clientID", memoryPersistence);
        mqttClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {

            }
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                String attribute = "Attribute";
                JSONObject json = new JSONObject(message.toString());
                LOGGER.info("json value is "+ json.toString());
                if (json.toString().contains(attribute)) {
                    int value = json.getInt(attribute);
                    Long sourceTimestamp = json.getLong("sourceTimestamp");
                    String deviceName = json.getString("deviceName");
                    String deviceType = json.getString("deviceType");
                    if (!nodeValueWithDevice.containsKey(deviceName)) {
                        List<Integer> attributeValue = new ArrayList<Integer>();
                        if (!attributeValue.contains(value)) {
                            attributeValue.add(value);
                        }
                        nodeValueWithDevice.put(deviceName, attributeValue);
                    } else {
                        List<Integer> temList = nodeValueWithDevice.get(deviceName);
                        if (!temList.contains(value)) {
                            temList.add(value);
                        }
                        nodeValueWithDevice.put(deviceName, temList);
                    }
                    if (!sourceTimestampWithDevice.containsKey(deviceName)) {
                        List<Long> Time = new ArrayList<Long>();
                        if (!Time.contains(sourceTimestamp)) {
                            Time.add(sourceTimestamp);
                        }
                        sourceTimestampWithDevice.put(deviceName, Time);
                    } else {
                        List<Long> tempList2 = sourceTimestampWithDevice.get(deviceName);
                        if (!tempList2.contains(sourceTimestamp)) {
                            tempList2.add(sourceTimestamp);
                        }
                        sourceTimestampWithDevice.put(deviceName, tempList2);
                    }
                    LOGGER.info(" map of source time stamp is :::" + sourceTimestampWithDevice);
                    LOGGER.info(" map of value is :::" + nodeValueWithDevice);
                }
            }
            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {

            }
        });
    } catch (MqttException | NoSuchAlgorithmException me) {
        LOGGER.error("Error while connecting to Mqtt broker. Error message {} Error code {}", me.getMessage());
    }
}
public void subscription(String inputTopic) {
    try {
        connectToBroker();
        mqttClient.subscribe(getOutputTopic(inputTopic), 1);
        LOGGER.info("subscription is done::::");
    } catch (Exception e) {
        LOGGER.error("Error while subscribing message to broker", e.getMessage());
        e.printStackTrace();
    }
}
4

2 回答 2

0

不,客户端都设计为在客户端连接的整个生命周期内接收所有消息。

如果您只想订阅给定的持续时间,则由您决定找到一种在该时间过去时收到通知并明确断开客户端连接的方法。

于 2021-04-15T18:17:35.573 回答
0

根据v5.0v3.1.1的 MQTT 规范,没有指定方式仅订阅固定时间间隔的主题。但是,这可以通过您的应用程序逻辑来完成。

在您的情况下,假设您完全控制客户端,您可以订阅某个主题,跟踪连接时间,然后在 15 分钟(或您指定的任何时间间隔)后为该主题发送一个 UNSUBSCRIBE 数据包。

于 2021-04-16T14:20:58.423 回答