3

我有一个 Web 服务器,它正在向同一个主题获取大量消息,并将响应消息返回到另一个主题。

我目前正在通过保持 MQTT 客户端始终连接来重复使用相同的 MQTT 客户端实例来进行回调和发送响应消息。

但是,在接收消息和发送响应的一个周期之后,我能够接收另一条消息但无法发送响应 - 我必须重新启动应用程序服务器。

拥有单个 MQTTclient 实例是一种好方法吗?可以一直保持连接吗?这种要求的最佳方法是什么?

这是我的代码:

public static void registerCallBack(String topicName, String userName,
        String password, String clientId, MqttCallback callback,
        MqttClient client) {
    MqttConnectOptions options = new MqttConnectOptions();
    options.setCleanSession(true);
    options.setKeepAliveInterval(30);
    options.setUserName(userName);
    options.setPassword(password.toCharArray());

    // Connect to Broker
    try {
        options.setSocketFactory(SslUtil.getSocketFactory(
                ManagerProps.MQTT_BROKER_CA_FILE.getValue(), ""));

        client.setCallback(callback);
        client.connect(options);
        client.subscribe(topicName, 0);
        log.info("successfuly registered callback to topic " + topicName);
    } catch (MqttException me) {
        log.error("MqttException, " + me);
    } catch (Exception e) {
        log.error("Exception, " + e);
    }
}

public static String publishMessage(MqttClient client, String message,
        String topic, String userName, String password) {
    MqttConnectOptions options = new MqttConnectOptions();
    options.setCleanSession(true);
    options.setKeepAliveInterval(30);
    options.setUserName(userName);
    options.setPassword(password.toCharArray());

    try {
        MqttMessage msg = new MqttMessage();
        msg.setPayload(message.getBytes());
        client.publish(topic, msg);
    } catch (MqttException e) {
        log.error("MqttException, " + e);
    } catch (Exception e) {
        log.error("Exception, " + e);
    }

    return message;
}
4

1 回答 1

0

我看到了类似的东西,并得到了这个工作:

final CallbackConnection connection = mqtt.callbackConnection();
    connection.listener(new org.fusesource.mqtt.client.Listener() {

        public void onConnected() {
        }
        public void onDisconnected() {
        }
        public void onFailure(Throwable value) {
            value.printStackTrace();
            System.exit(-2);
        }
        public void onPublish(UTF8Buffer topic, Buffer msg, Runnable ack) {
            String body = msg.utf8().toString();
            if( body.startsWith("REPLY: ")) {     
                // Don't reply to your own reply             
                System.out.println("Replied");
                System.out.println("");
            } else {                    
              try{            
              byte[] reply = "REPLY: Hello Back".getBytes();
              connection.publish(destination, reply, QoS.AT_MOST_ONCE, true, null)  ;
              msg.clear();
            }catch (Exception e){
               e.printStackTrace();
            }  
            }
        }
    });
于 2014-02-07T17:51:16.057 回答