0

我需要使用两个线程创建一个生产者-消费者 realtionshiop。一种是从磁盘上的文件中读取文本对象,然后将对象插入 FIFO 中的队列中,消费者线程正在从队列中读取以处理对象。但是我对使用什么类感到困惑?LinkedBlockingQueue 还是 PriorityBlockingQueue?甚至更好的东西?

目标和宗旨:

我正在尝试实时对推文进行聚类,但是我将 csv 文件中的推文存档,而不是使用 Twitter Streaming API。所以我试图通过从文件中读取推文并将它们放入队列中来模拟流的效果,然后消费者开始从队列中读取。我有非常大的 csv 文件,因此我更像是一个流媒体场景。因此,当我收到推文时,消费者正在从队列中获取推文并实时将它们聚集在一起。

4

2 回答 2

3

PriorityBlockingQueue 在您的情况下似乎没有意义,因为您只想按原始顺序处理消息。

如果你真的想自己处理队列,你可以使用有界的 LinkedBlockingQueue:

//example with a limit of 100,000 messages being in the queue at any one time
private static final BlockingQueue<Message> queue =
                                      new LinkedBlockingQueue<> (100_000);

在生产者中,您继续做:

Message msg = getMessage();
queue.put(msg); //blocks if the queue is full

在消费者中:

Message msg = queue.take(); //blocks until there is a message

Peter Lawrey 的替代方案包括:

private static final ExecutorService executor = Executors.newFixedThreadPool(10);

在你的制片人中:

final Message msg = getMessage();
Runnable task = new Runnable() {
    public void run() { process(msg); }
}
executor.submit(task);

并且没有消费者,因为您的生产者创建了它(任务)。


注意:在线程池示例中,我使用了 10 个线程,假设该process方法主要受 CPU 限制,并且您有大约 10 个处理器。在实践中:

  • 如果是这种情况(受 CPU 限制),您将使用Runtime.getRuntime().availableProcessors()获取处理器数量而不是硬编码数字。
  • 如果不是(I/O 限制),您将使用更多线程 - 很难预先估计最佳数量,您需要使用不同的数字来分析您的应用程序以找到最佳值。
于 2013-06-26T22:20:41.430 回答
1

您可以使用 LinkedBlockingQueue,但通常使用包装队列和线程池的 ExecutorService 更简单。您可以为每个作业提交一个任务。

PriorityBlockingQueue 用于设置任务的优先级。

我会查看文档中的示例。

http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html

于 2013-06-26T21:50:39.683 回答