这里是 coonectMq 的常见扩展并从中获取消息。(MqConnectionExtention.kt)
fun Context.connectMq(publishTopicChannelName: String, onConnectionSuccess: (topic: String?, message: MqttMessage?) -> Unit) {
val mClientId = BuildConfig.CLIENT_ID + System.currentTimeMillis()
val mqttAndroidClient = MqttAndroidClient(this, "tcp://34.212.00.188:1883", mClientId)
Timber.e("ChannelName:$publishTopicChannelName")
mqttAndroidClient.setCallback(object : MqttCallbackExtended {
override fun connectComplete(reconnect: Boolean, serverURI: String) {
if (reconnect) { //addToHistory("Reconnected to : " + serverURI)
Log.e("TAG", "Reconnected to : $serverURI")
// Because Clean Session is true, we need to re-subscribe
try {
mqttAndroidClient.subscribe(publishTopicChannelName, 0, object : IMqttMessageListener {
override fun messageArrived(topic: String?, message: MqttMessage?) {
onConnectionSuccess(topic, message)
}
})
} catch (ex: MqttException) {
System.err.println("Exception whilst subscribing")
ex.printStackTrace()
}
} else { //addToHistory("Connected to: " + serverURI);
Log.e("TAG", "Connected to: $serverURI")
}
}
override fun connectionLost(cause: Throwable) {
Log.e("TAG", "The Connection was lost.")
}
override fun messageArrived(topic: String, message: MqttMessage) {
Log.e("TAG", "Incoming message: " + message.payload.toString())
}
override fun deliveryComplete(token: IMqttDeliveryToken) {}
})
val mqttConnectOptions = setUpConnectionOptions("MQ_CONNECTION_USERNAME", "MQ_CONNECTION_PASSWORD")
mqttConnectOptions.isAutomaticReconnect = true
mqttConnectOptions.isCleanSession = false
try {
mqttAndroidClient.connect(mqttConnectOptions, null, object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken) {
val disconnectedBufferOptions = DisconnectedBufferOptions()
disconnectedBufferOptions.isBufferEnabled = true
disconnectedBufferOptions.bufferSize = 100
disconnectedBufferOptions.isPersistBuffer = false
disconnectedBufferOptions.isDeleteOldestMessages = false
mqttAndroidClient.setBufferOpts(disconnectedBufferOptions)
try {
mqttAndroidClient.subscribe(publishTopicChannelName, 0, object : IMqttMessageListener {
override fun messageArrived(topic: String?, message: MqttMessage?) {
onConnectionSuccess(topic, message)
}
})
} catch (ex: MqttException) {
System.err.println("Exception whilst subscribing")
ex.printStackTrace()
}
}
override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) { //addToHistory("Failed to connect to: " + serverUri);
}
})
} catch (ex: MqttException) {
ex.printStackTrace()
}
}
private fun setUpConnectionOptions(username: String, password: String): MqttConnectOptions {
val connOpts = MqttConnectOptions()
connOpts.isCleanSession = true
connOpts.userName = username
connOpts.password = password.toCharArray()
return connOpts
}
从 Java 类中,我像下面这样调用它并成功获取消息:
private void subscribeMQForVideo() {
MqConnectionExtentionKt.connectMq(mContext, "mq_video_channel_name", (topic, mqttMessage) -> {
// message Arrived!
Log.e("TAG", "Message Video: " + topic + " : " + new String(mqttMessage.getPayload()));
return null;
});
}
为了发布消息,我创建了类似的扩展,几乎没有什么区别。(MqConnectionPublishExtention.kt)
fun Context.connectMq(onConnectionSuccess: (mqttAndroidClient: MqttAndroidClient?) -> Unit) {
val mClientId = BuildConfig.CLIENT_ID + System.currentTimeMillis()
val mqttAndroidClient = MqttAndroidClient(this, BuildConfig.MQ_SERVER_URI, mClientId)
mqttAndroidClient.setCallback(object : MqttCallbackExtended {
override fun connectComplete(reconnect: Boolean, serverURI: String) {
if (reconnect) { //addToHistory("Reconnected to : " + serverURI)
Log.e("TAG", "Reconnected to : $serverURI")
// Because Clean Session is true, we need to re-subscribe
onConnectionSuccess(mqttAndroidClient)
} else { //addToHistory("Connected to: " + serverURI);
Log.e("TAG", "Connected to: $serverURI")
}
}
override fun connectionLost(cause: Throwable) {
Log.e("TAG", "The Connection was lost.")
}
override fun messageArrived(topic: String, message: MqttMessage) {
Log.e("TAG", "Incoming message: " + message.payload.toString())
}
override fun deliveryComplete(token: IMqttDeliveryToken) {}
})
val mqttConnectOptions = setUpConnectionOptions(BuildConfig.MQ_CONNECTION_USERNAME, BuildConfig.MQ_CONNECTION_PASSWORD)
mqttConnectOptions.isAutomaticReconnect = true
mqttConnectOptions.isCleanSession = false
try {
mqttAndroidClient.connect(mqttConnectOptions, null, object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken) {
val disconnectedBufferOptions = DisconnectedBufferOptions()
disconnectedBufferOptions.isBufferEnabled = true
disconnectedBufferOptions.bufferSize = 100
disconnectedBufferOptions.isPersistBuffer = false
disconnectedBufferOptions.isDeleteOldestMessages = false
mqttAndroidClient.setBufferOpts(disconnectedBufferOptions)
onConnectionSuccess(mqttAndroidClient)
}
override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) { //addToHistory("Failed to connect to: " + serverUri);
}
})
} catch (ex: MqttException) {
ex.printStackTrace()
}
}
private fun setUpConnectionOptions(username: String, password: String): MqttConnectOptions {
val connOpts = MqttConnectOptions()
connOpts.isCleanSession = true
connOpts.userName = username
connOpts.password = password.toCharArray()
return connOpts
}
从 java 类发布消息
private void publishExerciseDataToMQChannel() {
MqConnectionPublishExtentionKt.connectMq(mContext, (mqttAndroidClient) -> {
try {
JSONObject jsonObject = new JSONObject();
jsonObject.put("params", mlParams);
jsonObject.put("workoutid", workoutId);
jsonObject.put("userid", model.getUserIdFromPrefs());
jsonObject.put("stream_id", streamDataModel.getStreamId());
MqttMessage message = new MqttMessage();
message.setPayload(jsonObject.toString().getBytes());
mqttAndroidClient.publish("Channel_name", message);
Log.e("TAG", message.getQos() + "");
if (!mqttAndroidClient.isConnected()) {
Log.e("TAG", mqttAndroidClient.getBufferedMessageCount() + " messages in buffer.");
}
} catch (MqttException e) {
System.err.println("Error Publishing: " + e.getMessage());
e.printStackTrace();
} catch (JSONException e) {
System.err.println("Error Publishing: " + e.getMessage());
e.printStackTrace();
}
return null;
});
}