0

我正在尝试处理从 MQ 基础架构获得的一些消息。我有两个阻塞队列,sharedQueue并且pubQueue. sharedqueue填满了我从 MQ 基础架构获得的消息,如下所示。它将消息放入sharedQueue.

client.setCallback(new CallBack("inst", sharedQueue));

messagemanipulator 线程将从中读取sharedQueue,处理它并将响应放入以pubQueue供以后发布。

new MessageManipulatorThread(sharedQueue,pubQueue).run();

发布者线程将从 MQ 基础架构获取消息pubQueue并将其发布到 MQ 基础架构。

新的 PublisherThread(pubQueue).run();

以下是完整代码:

 public class ArrayBlockingQueueExample {

 private BlockingQueue<String> sharedQueue = new ArrayBlockingQueue<>(64);
 private BlockingQueue<String> pubQueue = new ArrayBlockingQueue<>(64);


public static void main(String[] args) throws MqttException, Exception {

    new ArrayBlockingQueueExample().startThreads();

}

public void startThreads() throws MqttException, Exception{

    MqttClient client =  new MQTTClientFactory().getInstance();
    client.setCallback(new CallBack("inst", sharedQueue));

    new MessageManipulatorThread(sharedQueue,pubQueue).run();
    new PublisherThread(pubQueue).run();


}



 public MessageManipulatorThread( BlockingQueue<String> sharedQueue , BlockingQueue<String> pubQueue){

    this.sharedQueue = sharedQueue;
    this.pubQueue = pubQueue;

}

    public void run() {
        while (true) {
            try {
                String msg = sharedQueue.take();
                System.out.println(Thread.currentThread().getName() + "manipulator runnning => "+msg);
                pubQueue.put(msg);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


 }

public class PublisherThread implements Runnable {

private BlockingQueue<String> sharedQueue;

public PublisherThread(BlockingQueue<String> sharedQueue){

    this.sharedQueue = sharedQueue;

}

     public void run() {
         while (true) {
             System.out.println("Running pub");
             try {
                 System.out.println("pub=>"+sharedQueue.take() );
                 Thread.sleep(500);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }
     }

  }

问题new PublisherThread(pubQueue).run();永远不会解决。我猜这是一个线程同步问题pubQueue。应该等到它有任何数据被填充MessageManipulatorThread,但它看起来不像那样。PublisherThread它正在等待pubQueue释放,但它永远不会变得免费!,还有什么我应该为此做的吗?任何帮助深表感谢。

4

2 回答 2

6

您正在使用Runnable.run()而不是Thread.start(),所以这是:

new MessageManipulatorThread(sharedQueue,pubQueue).run();
new PublisherThread(pubQueue).run();

不会工作。那是因为run()实际上在当前线程内部运行了runnable的方法,而不是创建一个新线程并单独执行它。

相反,请执行以下操作:

new Thread(new MessageManipulatorThread(sharedQueue,pubQueue), "MessageManipulatorThread").start();
new Thread(new PublisherThread(pubQueue), "PublisherThread").start();

编辑:

fge 在问题中发表了以下评论:

为什么不使用 anExecutorService而不是手动完成它的工作?

为了澄清他的意思,他的意思是使用 anExecutorService来处理消息,pubQueue而不是创建线程来提取消息并手动处理它们。该代码如下所示:

ExecutorService executor = Executors.newSingleThreadExecutor();
new Thread(new MessageManipulatorThread(sharedQueue, executor), "MessageManipulatorThread").start();

然后MessageManipulatorThread班级将更改为:

public class MessageManipulatorThread implements Runnable {

    private BlockingQueue<String> sharedQueue;
    private ExecutorService executor;

    public MessageManipulatorThread(BlockingQueue<String> sharedQueue, ExecutorService executor){
        this.sharedQueue = sharedQueue;
        this.executor = executor;
    }

    public void run() {
        while (true) {
            try {
                String msg = sharedQueue.take();
                System.out.println(Thread.currentThread().getName() + "manipulator runnning => "+msg);
                executor.execute(new PublisherThread(msg));
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

然后,您将进行更改PublisherThread,以便它只使用处理传递给它的单个消息。这是您尝试做的另一种方法。

这种方法也允许一些灵活性。使用另一种方法,PublisherThread一次只能处理一条消息(同步)。使用该ExecutorService接口允许您更改实现,这可以允许它一次处理多个消息(异步),只需更改以下内容:

ExecutorService executor = Executors.newSingleThreadExecutor();

对此:

ExecutorService executor = Executors.newFixedThreadPool(10);

该语句允许执行程序最多启动 10 个线程,这意味着一次最多可以处理 10 条消息。有关创建实现Executors的更多方法,请参见该类。ExecutorService

于 2013-07-16T16:26:47.360 回答
1

这两行是你的问题:

new MessageManipulatorThread(sharedQueue,pubQueue).run();
new PublisherThread(pubQueue).run();

你应该首先让你的“线程”类扩展Thread而不是仅仅实现Runnable,然后调用它:

new MessageManipulatorThread(sharedQueue,pubQueue).start();
new PublisherThread(pubQueue).start();

如所写,您的代码实际上并没有产生任何新线程,因此第一个run()方法永远不会返回,而您的第二个run()方法永远不会被调用。

于 2013-07-16T16:29:05.307 回答