0

我已经设置了 RabbitMQ,启用了 web UI 进行管理,启用了 mqtt_plugin 和端口 1883、8883、5672、15672 (Docker)。我将 Paho MqttClient 用于我正在开发的 Android 应用程序,以将消息发布到 MQ 代理。连接很好,但是在 Web UI 和 CLI 上没有收到任何消息作为检查。

rabbitmqctl list_queues

连接页面: 活跃的出版商

频道页面: 频道页面

交换页面: 活跃的交流

队列页面: 可用队列

下面是我正在处理的代码。

private static final String CONNECTION_URL = "tcp://my-app.com:1883";
private static final String USERNAME = "test_user";
private static final String PASSWORD = "test_pass";
private static final String EXCHANGE = "TestExchange";
private static final String QUEUE = "TestQueue";
private static final String TOPIC = "TestTopic";

// executed onCreate
private void initializeMQ() {
        Log.d(TAG, "==== STARTING MQTT CONNECTION ====");

        String clientId = "Skwamiyou";
        client = new MqttAndroidClient(this, CONNECTION_URL, clientId);
        MqttConnectOptions options = setConnectionOptions(USERNAME, PASSWORD);

        try {
            IMqttToken token = client.connect(options);
            token.setActionCallback(new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    Log.d(TAG, "Connected");
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    Log.d(TAG, "Failed connection");
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
}

private static MqttConnectOptions setConnectionOptions(String username, String password) {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1);
        options.setCleanSession(false);
        options.setAutomaticReconnect(true);
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        return options;
}

// this is called on button click publish
public void publishLog() {
        Log.d(TAG, "Publishing....");

        counter++;
        String payload = "Send to My MQ! - " + counter;

        try {
            MqttMessage message = new MqttMessage(payload.getBytes());
            message.setQos(1);
            message.setRetained(true);
            client.publish(TOPIC, message);
            Toast.makeText(this, "MESSAGE SENT! - " + counter, Toast.LENGTH_SHORT).show();
        } catch (MqttException e) {
            e.printStackTrace();
        }
}

我一直在寻找答案并尝试重新安装 MQ,但还是一样。

4

1 回答 1

0

这里是 coonectMq 的常见扩展并从中获取消息。(MqConnectionExtention.kt)

    fun Context.connectMq(publishTopicChannelName: String, onConnectionSuccess: (topic: String?, message: MqttMessage?) -> Unit) {

    val mClientId = BuildConfig.CLIENT_ID + System.currentTimeMillis()
    val mqttAndroidClient = MqttAndroidClient(this, "tcp://34.212.00.188:1883", mClientId)

    Timber.e("ChannelName:$publishTopicChannelName")
    mqttAndroidClient.setCallback(object : MqttCallbackExtended {
        override fun connectComplete(reconnect: Boolean, serverURI: String) {
            if (reconnect) { //addToHistory("Reconnected to : " + serverURI)
                Log.e("TAG", "Reconnected to : $serverURI")
                // Because Clean Session is true, we need to re-subscribe
                try {
                    mqttAndroidClient.subscribe(publishTopicChannelName, 0, object : IMqttMessageListener {
                        override fun messageArrived(topic: String?, message: MqttMessage?) {
                            onConnectionSuccess(topic, message)
                        }
                    })
                } catch (ex: MqttException) {
                    System.err.println("Exception whilst subscribing")
                    ex.printStackTrace()
                }

            } else { //addToHistory("Connected to: " + serverURI);
                Log.e("TAG", "Connected to: $serverURI")
            }
        }

        override fun connectionLost(cause: Throwable) {
            Log.e("TAG", "The Connection was lost.")
        }

        override fun messageArrived(topic: String, message: MqttMessage) {
            Log.e("TAG", "Incoming message: " + message.payload.toString())
        }

        override fun deliveryComplete(token: IMqttDeliveryToken) {}
    })

    val mqttConnectOptions = setUpConnectionOptions("MQ_CONNECTION_USERNAME", "MQ_CONNECTION_PASSWORD")
    mqttConnectOptions.isAutomaticReconnect = true
    mqttConnectOptions.isCleanSession = false

    try {
        mqttAndroidClient.connect(mqttConnectOptions, null, object : IMqttActionListener {
            override fun onSuccess(asyncActionToken: IMqttToken) {
                val disconnectedBufferOptions = DisconnectedBufferOptions()
                disconnectedBufferOptions.isBufferEnabled = true
                disconnectedBufferOptions.bufferSize = 100
                disconnectedBufferOptions.isPersistBuffer = false
                disconnectedBufferOptions.isDeleteOldestMessages = false
                mqttAndroidClient.setBufferOpts(disconnectedBufferOptions)
                try {
                    mqttAndroidClient.subscribe(publishTopicChannelName, 0, object : IMqttMessageListener {
                        override fun messageArrived(topic: String?, message: MqttMessage?) {
                            onConnectionSuccess(topic, message)
                        }
                    })
                } catch (ex: MqttException) {
                    System.err.println("Exception whilst subscribing")
                    ex.printStackTrace()
                }
            }

            override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) { //addToHistory("Failed to connect to: " + serverUri);
            }
        })
    } catch (ex: MqttException) {
        ex.printStackTrace()
    }
}

private fun setUpConnectionOptions(username: String, password: String): MqttConnectOptions {
    val connOpts = MqttConnectOptions()
    connOpts.isCleanSession = true
    connOpts.userName = username
    connOpts.password = password.toCharArray()
    return connOpts
}

从 Java 类中,我像下面这样调用它并成功获取消息:

private void subscribeMQForVideo() {
    
    MqConnectionExtentionKt.connectMq(mContext, "mq_video_channel_name", (topic, mqttMessage) -> {
        // message Arrived!
        Log.e("TAG", "Message Video: " + topic + " : " + new String(mqttMessage.getPayload()));

        
        return null;
    });
}

为了发布消息,我创建了类似的扩展,几乎没有什么区别。(MqConnectionPublishExtention.kt)

    fun Context.connectMq(onConnectionSuccess: (mqttAndroidClient: MqttAndroidClient?) -> Unit) {

    val mClientId = BuildConfig.CLIENT_ID + System.currentTimeMillis()
    val mqttAndroidClient = MqttAndroidClient(this, BuildConfig.MQ_SERVER_URI, mClientId)

    mqttAndroidClient.setCallback(object : MqttCallbackExtended {
        override fun connectComplete(reconnect: Boolean, serverURI: String) {
            if (reconnect) { //addToHistory("Reconnected to : " + serverURI)
                Log.e("TAG", "Reconnected to : $serverURI")
                // Because Clean Session is true, we need to re-subscribe
                onConnectionSuccess(mqttAndroidClient)

            } else { //addToHistory("Connected to: " + serverURI);
                Log.e("TAG", "Connected to: $serverURI")
            }
        }

        override fun connectionLost(cause: Throwable) {
            Log.e("TAG", "The Connection was lost.")
        }

        override fun messageArrived(topic: String, message: MqttMessage) {
            Log.e("TAG", "Incoming message: " + message.payload.toString())
        }

        override fun deliveryComplete(token: IMqttDeliveryToken) {}
    })

    val mqttConnectOptions = setUpConnectionOptions(BuildConfig.MQ_CONNECTION_USERNAME, BuildConfig.MQ_CONNECTION_PASSWORD)
    mqttConnectOptions.isAutomaticReconnect = true
    mqttConnectOptions.isCleanSession = false

    try {
        mqttAndroidClient.connect(mqttConnectOptions, null, object : IMqttActionListener {
            override fun onSuccess(asyncActionToken: IMqttToken) {
                val disconnectedBufferOptions = DisconnectedBufferOptions()
                disconnectedBufferOptions.isBufferEnabled = true
                disconnectedBufferOptions.bufferSize = 100
                disconnectedBufferOptions.isPersistBuffer = false
                disconnectedBufferOptions.isDeleteOldestMessages = false
                mqttAndroidClient.setBufferOpts(disconnectedBufferOptions)
                onConnectionSuccess(mqttAndroidClient)
            }

            override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) { //addToHistory("Failed to connect to: " + serverUri);
            }
        })
    } catch (ex: MqttException) {
        ex.printStackTrace()
    }
}

private fun setUpConnectionOptions(username: String, password: String): MqttConnectOptions {
    val connOpts = MqttConnectOptions()
    connOpts.isCleanSession = true
    connOpts.userName = username
    connOpts.password = password.toCharArray()
    return connOpts
}

从 java 类发布消息

private void publishExerciseDataToMQChannel() {
   
 MqConnectionPublishExtentionKt.connectMq(mContext, (mqttAndroidClient) -> {

        try {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("params", mlParams);
            jsonObject.put("workoutid", workoutId);
            jsonObject.put("userid", model.getUserIdFromPrefs());
            jsonObject.put("stream_id", streamDataModel.getStreamId());
            

            MqttMessage message = new MqttMessage();
            message.setPayload(jsonObject.toString().getBytes());
            mqttAndroidClient.publish("Channel_name", message);

            Log.e("TAG", message.getQos() + "");
            if (!mqttAndroidClient.isConnected()) {
                Log.e("TAG", mqttAndroidClient.getBufferedMessageCount() + " messages in buffer.");
            }
        } catch (MqttException e) {
            System.err.println("Error Publishing: " + e.getMessage());
            e.printStackTrace();
        } catch (JSONException e) {
            System.err.println("Error Publishing: " + e.getMessage());
            e.printStackTrace();
        }
        return null;
    });
}
于 2020-08-11T08:25:32.903 回答