我正在尝试处理从 MQ 基础架构获得的一些消息。我有两个阻塞队列,sharedQueue
并且pubQueue
. sharedqueue
填满了我从 MQ 基础架构获得的消息,如下所示。它将消息放入sharedQueue
.
client.setCallback(new CallBack("inst", sharedQueue));
messagemanipulator 线程将从中读取sharedQueue
,处理它并将响应放入以pubQueue
供以后发布。
new MessageManipulatorThread(sharedQueue,pubQueue).run();
发布者线程将从 MQ 基础架构获取消息pubQueue
并将其发布到 MQ 基础架构。
新的 PublisherThread(pubQueue).run();
以下是完整代码:
public class ArrayBlockingQueueExample {
private BlockingQueue<String> sharedQueue = new ArrayBlockingQueue<>(64);
private BlockingQueue<String> pubQueue = new ArrayBlockingQueue<>(64);
public static void main(String[] args) throws MqttException, Exception {
new ArrayBlockingQueueExample().startThreads();
}
public void startThreads() throws MqttException, Exception{
MqttClient client = new MQTTClientFactory().getInstance();
client.setCallback(new CallBack("inst", sharedQueue));
new MessageManipulatorThread(sharedQueue,pubQueue).run();
new PublisherThread(pubQueue).run();
}
public MessageManipulatorThread( BlockingQueue<String> sharedQueue , BlockingQueue<String> pubQueue){
this.sharedQueue = sharedQueue;
this.pubQueue = pubQueue;
}
public void run() {
while (true) {
try {
String msg = sharedQueue.take();
System.out.println(Thread.currentThread().getName() + "manipulator runnning => "+msg);
pubQueue.put(msg);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class PublisherThread implements Runnable {
private BlockingQueue<String> sharedQueue;
public PublisherThread(BlockingQueue<String> sharedQueue){
this.sharedQueue = sharedQueue;
}
public void run() {
while (true) {
System.out.println("Running pub");
try {
System.out.println("pub=>"+sharedQueue.take() );
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
问题new PublisherThread(pubQueue).run();
永远不会解决。我猜这是一个线程同步问题pubQueue
。应该等到它有任何数据被填充MessageManipulatorThread
,但它看起来不像那样。PublisherThread
它正在等待pubQueue
释放,但它永远不会变得免费!,还有什么我应该为此做的吗?任何帮助深表感谢。