PriorityBlockingQueue 在您的情况下似乎没有意义,因为您只想按原始顺序处理消息。
如果你真的想自己处理队列,你可以使用有界的 LinkedBlockingQueue:
//example with a limit of 100,000 messages being in the queue at any one time
private static final BlockingQueue<Message> queue =
new LinkedBlockingQueue<> (100_000);
在生产者中,您继续做:
Message msg = getMessage();
queue.put(msg); //blocks if the queue is full
在消费者中:
Message msg = queue.take(); //blocks until there is a message
Peter Lawrey 的替代方案包括:
private static final ExecutorService executor = Executors.newFixedThreadPool(10);
在你的制片人中:
final Message msg = getMessage();
Runnable task = new Runnable() {
public void run() { process(msg); }
}
executor.submit(task);
并且没有消费者,因为您的生产者创建了它(任务)。
注意:在线程池示例中,我使用了 10 个线程,假设该process
方法主要受 CPU 限制,并且您有大约 10 个处理器。在实践中:
- 如果是这种情况(受 CPU 限制),您将使用
Runtime.getRuntime().availableProcessors()
获取处理器数量而不是硬编码数字。
- 如果不是(I/O 限制),您将使用更多线程 - 很难预先估计最佳数量,您需要使用不同的数字来分析您的应用程序以找到最佳值。