0

我使用 HiveMQ 客户端(Java 中的 MQTT 开源实现)编写了一个程序,其中涉及使用两个多线程客户端。一个客户端被指定为发布者,另一个被指定为订阅者(我知道我可以同一个客户端既可以发布也可以订阅)。我正在尝试设计一个测试,发布者向客户端发送 100 条消息。目标是计算发送和接收所有消息所需的时间。我意识到如果我想计算接收消息需要多长时间,我需要让订阅线程等到发布线程准备好发送消息。我决定使用 wait() 和 notify() 但我似乎无法正确实现它。我知道您需要使用我尝试做的相同对象,但我无法正确设计。我在两个客户端的两种运行方法的代码上添加了狙击手。CommonThread.java 实际上不是一个线程,我没有运行它,但我尝试在类之间使用它来等待()和通知(),但我错过了一些东西。

HiveMQ:

https://github.com/hivemq/hivemq-community-edition

https://github.com/hivemq/hivemq-mqtt-client

子主线程.java:

public void run() {

    // Creates the client object using Blocking API 

     Mqtt5BlockingClient subscriber = Mqtt5Client.builder()
    .identifier(UUID.randomUUID().toString()) // the unique identifier of the MQTT client. The ID is randomly generated between 
    .serverHost("localhost")  // the host name or IP address of the MQTT server. Kept it localhost for testing. localhost is default if not specified.
    .serverPort(1883)  // specifies the port of the server
    .addConnectedListener(context -> ClientConnectionRetreiver.printConnected("Subscriber1"))        // prints a string that the client is connected
    .addDisconnectedListener(context -> ClientConnectionRetreiver.printDisconnected("Subscriber1"))  // prints a string that the client is disconnected
    .buildBlocking();  // creates the client builder                
    subscriber.connect();  // connects the client
    ClientConnectionRetreiver.getConnectionInfo(subscriber);   // gets connection info

    try {
        Mqtt5Publishes receivingClient1 = subscriber.publishes(MqttGlobalPublishFilter.ALL);  // creates a "publishes" instance thats used to queue incoming messages                                                                   // .ALL - filters all incoming Publish messages     

        subscriber.subscribeWith()   
        .topicFilter(subscriberTopic) 
        .qos(MqttQos.EXACTLY_ONCE)
        .send(); 

        PubSubUtility.printSubscribing("Subscriber1"); 


        System.out.println("Publisher ready to send: " + PubMainThread.readyToSend);

        x.threadCondWait();    // <<<<< HOW TO MAKE THIS WORK 
        System.out.println("Back to the normal execution flow :P");


        startTime = System.currentTimeMillis();
        System.out.println("Timer started");

        for (int i = 1; i <= messageNum; i++) {  
            Mqtt5Publish receivedMessage = receivingClient1.receive(MESSAGEWAITTIME,TimeUnit.SECONDS).get(); // receives the message using the "publishes" instance waiting up to 5 minutes                                                                         // .get() returns the object if available or throws a NoSuchElementException 
            PubSubUtility.convertMessage(receivedMessage);  // Converts a Mqtt5Publish instance to string and prints 
            }   

        endTime = System.currentTimeMillis();

        finalTime = endTime - startTime;
        System.out.println( finalTime + PubMainThread.finalTime + " milliseconds");
        finalSecTime = TimeUnit.MILLISECONDS.toSeconds(finalTime);

        System.out.println(finalSecTime + PubMainThread.finalSecTime);
        }   

    catch (InterruptedException e) {    // Catches interruptions in the thread 
        LOGGER.log(Level.SEVERE, "The thread was interrupted while waiting for a message to be received", e);
        }

    catch (NoSuchElementException e){
        System.out.println("There are no received messages");   // Handles when a publish instance has no messages 
        }

    subscriber.disconnect();  

    }

PubMainThread.java:

public void run() {

    // Creates the client object using Blocking API 

    Mqtt5BlockingClient publisher = Mqtt5Client.builder()
    .identifier(UUID.randomUUID().toString()) // the unique identifier of the MQTT client. The ID is randomly generated between 
    .serverHost("localhost")  // the host name or IP address of the MQTT server. Kept it localhost for testing. localhost is default if not specified.
    .serverPort(1883)  // specifies the port of the server
    .addConnectedListener(context -> ClientConnectionRetreiver.printConnected("Publisher1"))         // prints a string that the client is connected
    .addDisconnectedListener(context -> ClientConnectionRetreiver.printDisconnected("Publisher1"))  // prints a string that the client is disconnected
    .buildBlocking();  // creates the client builder                
    publisher.connect();  // connects the client
    ClientConnectionRetreiver.getConnectionInfo(publisher);   // gets connection info

    PubSubUtility.printPublising("Publisher1");
    readyToSend = true; 
    x.threadCondNotify();        <<<<< HOW TO MAKE THIS WORK 
    // Think about making the PubClient Thread sleep for a short while so its not too ahead of the client
    startTime = System.currentTimeMillis();

        for (int i = 1; i <= messageNum; i++) { 
             publisher.publishWith()    
             .topic(publisherTopic)   // publishes to the specified topic
             .qos(MqttQos.EXACTLY_ONCE)  
             .payload(convertedMessage)  // the contents of the message 
             .send();
        }

    endTime = System.currentTimeMillis();
    finalTime = endTime - startTime;    
    finalSecTime = TimeUnit.MILLISECONDS.toSeconds(finalTime);

    PubSubUtility.printNumOfPublished("Publisher1", messageNum);    


    publisher.disconnect();  
    }

公共类 CommonThread {

private static final Logger LOGGER = Logger.getLogger(SubMainThread.class.getName());  // Creates a logger instance 


public synchronized void threadCondNotify() {
    notify();
    System.out.println("Notified other thread");
}

public synchronized void threadCondWait() {

    try {       
        while (PubMainThread.readyToSend != true) {
        System.out.println("Waiting for another thread....");
        wait();
        }
    }

    catch (InterruptedException e) {
        LOGGER.log(Level.SEVERE, "The thread was interrupted while waiting for another thread", e);
    }
    }

}
4

1 回答 1

-1

在 Sender 中(粗略的 Java 代码,省略了一些细节):

//package statement and imports here

class Sender extends Thread {
    public static final Boolean x= new Boolean(true); 

    public void run() {
        //initialize here
        synchronized(x) {
            x.notify();
        }
        //send messages here
    }
}

在 Receiver 中(在 Sender 之前开始):

//package statement and imports here

class Receiver extends Thread {

    public void run() {
        //initialize here
        synchronized(Sender.x) {
            Sender.x.wait(); //blocks till Sender.x.notify()
        }
        Date start= new Date();
        //receive messages here
        Date end= new Date();
        int duration_milliseconds= end.getTime()-start.getTime();
    }
}

也许你必须添加

try{ /* code here */ } catch (InterruptedException e) {}

随意讨论直接使用 notify() 和 wait() 的意义和废话,尤其是在具有扩展并发库的 Java 版本中......

于 2019-06-16T23:28:42.947 回答