我试图弄清楚如何使用相同的 try catch 块在 HiveMQ 客户端中接收多条消息,即使使用不同的客户端也是如此。我按照这个例子:


上面的示例适用于一个客户端和一个发布和订阅,但如果可能的话,我想在 try catch 的同一块中执行多个这些操作。

package com.main;

import java.util.UUID;

import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient.Mqtt5Publishes;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import java.util.logging.Logger;
import java.util.NoSuchElementException;

import java.util.logging.Level;
import java.util.concurrent.TimeUnit;

public class Main {

    private static final Logger LOGGER = Logger.getLogger(Main.class.getName());  // Creates a logger instance 

    public static void main(String[] args) {

            Mqtt5BlockingClient client1 = Mqtt5Client.builder()
            .identifier(UUID.randomUUID().toString()) // the unique identifier of the MQTT client. The ID is randomly generated between 
            .serverHost("localhost")  // the host name or IP address of the MQTT server. Kept it for testing. localhost is default if not specified.
            .serverPort(1883)  // specifies the port of the server
            .buildBlocking();  // creates the client builder

            client1.connect();  // connects the client
            System.out.println("Client1 Connected");

            Mqtt5BlockingClient client2 = Mqtt5Client.builder()
                    .identifier(UUID.randomUUID().toString()) // the unique identifier of the MQTT client. The ID is randomly generated between 
                    .serverHost("localhost")  // the host name or IP address of the MQTT server. Kept it for testing. localhost is default if not specified.
                    .serverPort(1883)  // specifies the port of the server
                    .buildBlocking();  // creates the client builder

            client2.connect();  // connects the client
            System.out.println("Client2 Connected");            

            String testmessage = "How is it going!";
            byte[] messagebytesend = testmessage.getBytes();   // stores a message as a byte array to be used in the payload 

    try {  

        Mqtt5Publishes publishes = client1.publishes(MqttGlobalPublishFilter.ALL);  // creates a "publishes" instance thats used to queue incoming messages
                                                                                    // .ALL - filters all incoming Publish messages 
            client1.subscribeWith()  // creates a subscription 
            .topicFilter("test/something1/topic")  // filters to receive messages only on this topic (# = Multilevel wild card, + = single level wild card)
            .qos(MqttQos.AT_LEAST_ONCE)  // Sets the QoS to 2 (At least once) 
            System.out.println("The client1 has subscribed");

            client1.publishWith()  // publishes the message to the subscribed topic 
            .topic("test/something1/topic")   // publishes to the specified topic
            .payload(messagebytesend)  // the contents of the message 
            System.out.println("The client1 has published");

         Mqtt5Publish receivedMessage = publishes.receive(5,TimeUnit.SECONDS).get(); // receives the message using the "publishes" instance waiting up to 5 seconds                                                                         // .get() returns the object if available or throws a NoSuchElementException 

         byte[] tempdata = receivedMessage.getPayloadAsBytes();    // converts the "Optional" type message to a byte array 
         String getdata = new String(tempdata); // converts the byte array to a String 

        client2.subscribeWith()  // creates a subscription 
           .topicFilter("test/something2/topic")  // filters to receive messages only on this topic (# = Multilevel wild card, + = single level wild card)
           .qos(MqttQos.AT_LEAST_ONCE)  // Sets the QoS to 2 (At least once) 
           System.out.println("The client2 has subscribed");

         client2.publishWith()  // publishes the message to the subscribed topic 
            .topic("test/something2/topic")   // publishes to the specified topic
            .payload("The second message :P".getBytes())  // the contents of the message 
            System.out.println("The client2 has published");  

            // VV   Why isn't the publish instance below receiving the second message? Do i need another try catch?  VV

         receivedMessage = publishes.receive(5,TimeUnit.SECONDS).get(); // receives the message using the "publishes" instance waiting up to 5 seconds                                                                          // .get() returns the object if available or throws a NoSuchElementException 

         byte[] tempdata2 = receivedMessage.getPayloadAsBytes();    // converts the "Optional" type message to a byte array 
         getdata = new String(tempdata2); // converts the byte array to a String 


    catch (InterruptedException e) {    // Catches interruptions in the thread 
        LOGGER.log(Level.SEVERE, "The thread was interrupted while waiting for a message to be received", e);

    catch (NoSuchElementException e){
        System.out.println("There are no received messages");   // Handles when a publish instance has no messages 

    System.out.println("Client1 Disconnected");

    System.out.println("Client2 Disconnected");



客户端 1 已连接

客户端 2 已连接

client1 已订阅

客户 1 已发布


client2 已订阅



客户端 1 断开连接

客户端 2 断开连接


客户端 1 已连接

客户端 2 已连接

client1 已订阅

客户 1 已发布


client2 已订阅



客户端 1 断开连接

客户端 2 断开连接


我运行了你的代码,发现了这个 WARN 日志:

2019-06-11 20:32:22,774 WARN  - No publish flow registered for MqttStatefulPublish{stateless=MqttPublish{topic=test/something2/topic, payload=21byte, qos=AT_LEAST_ONCE, retain=false}, packetIdentifier=51, dup=false, topicAlias=0, subscriptionIdentifiers=[1]}.

您似乎忘记为第二个客户端设置发布过滤器。实际上,在您等待第二条消息(针对 client2)的代码中,您检查了 client1 的消息流。所以你只需要为client2添加一个发布过滤器:

Mqtt5Publishes publishesClient2 = client2.publishes(MqttGlobalPublishFilter.ALL);

然后等待 client2 的消息:

// VV   Why isn't the publish instance below receiving the second message? Do i need another try catch?  VV

     receivedMessage = publishesClient2.receive(5,TimeUnit.SECONDS).get(); 



Client1 Connected
Client2 Connected
The client1 has subscribed
The client1 has published

How is it going!
The client2 has subscribed
The client2 has published

2019-06-11 20:46:36,537 WARN  - No publish flow registered for MqttStatefulPublish{stateless=MqttPublish{topic=test/something2/topic, payload=21byte, qos=AT_LEAST_ONCE, retain=false}, packetIdentifier=51, dup=false, topicAlias=0, subscriptionIdentifiers=[1]}.
There are no received messages
Client1 Disconnected
Client2 Disconnected


Client1 Connected
Client2 Connected
The client1 has subscribed
The client1 has published

How is it going!
The client2 has subscribed
The client2 has published

The second message :P
Client1 Disconnected
Client2 Disconnected

编辑:我希望这是您正在寻找的解决方案,因为所需的输出与我通过修复获得的输出不同。由于 NoSuchElementException 不再被抛出/捕获。因此,第二条消息丢失后“没有收到消息”。

编辑以回应评论:用于收集具有异步风格的 client2 的发布消息的片段(只需将 try 块中的代码替换为以下代码):

// The list where we put our received publish messages
            final List<Mqtt5Publish> incomingMessagesClient2 = new LinkedList<>();

            // With the async flavour we can add a consumer for the incoming publish messages
            client2.toAsync().publishes(MqttGlobalPublishFilter.ALL, mqtt5Publish ->

            client1.publishes(MqttGlobalPublishFilter.ALL);  // creates a "publishes" instance thats used to queue incoming messages

            client2.subscribeWith()  // creates a subscription
                    .topicFilter("test/something1/topic")  // filters to receive messages only on this topic (# = Multilevel wild card, + = single level wild card)
                    .qos(MqttQos.AT_LEAST_ONCE)  // Sets the QoS to 2 (At least once)
            System.out.println("The client2 has subscribed");

            client1.publishWith()  // publishes the message to the subscribed topic
                    .topic("test/something1/topic")   // publishes to the specified topic
                    .payload(messagebytesend)  // the contents of the message
            System.out.println("The client1 has published");

            client1.publishWith()  // publishes the message to the subscribed topic
                    .topic("test/something1/topic")   // publishes to the specified topic
                    .payload("The second message :P".getBytes())  // the contents of the message
            System.out.println("The client1 has published");


            incomingMessagesClient2.forEach(mqtt5Publish -> System.out.println(new String(mqtt5Publish.getPayloadAsBytes())));


来自 HiveMQ 团队的 Michael

于 2019-06-11T18:49:09.407 回答