5

我有一个多线程应用程序,它有一个生产者线程和几个消费者线程。数据存储在共享线程安全集合中,并在缓冲区中有足够数据时刷新到数据库。

从 javadocs -

BlockingQueue<E>

一个队列,它还支持在检索元素时等待队列变为非空,并在存储元素时等待队列中的空间变为可用的操作。

take()

检索并删除此队列的头部,如有必要,等待元素可用。

我的问题-

  1. 是否有另一个具有 E[] take(int n) 方法的集合?即阻塞队列一直等到一个元素可用。我想要的是它应该等到 100 或 200 个元素可用。
  2. 或者,是否有另一种方法可以在不进行轮询的情况下解决问题?
4

4 回答 4

2

drainTo方法并不完全是您正在寻找的,但它会达到您的目的吗?

http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html#drainTo(java.util.Collection , int)

编辑

take您可以使用和的组合来实现性能稍高的批处理阻塞 takemin drainTo

public <E> void drainTo(final BlockingQueue<E> queue, final List<E> list, final int min) throws InterruptedException
{
  int drained = 0;
  do 
  {
    if (queue.size() > 0)
      drained += queue.drainTo(list, min - drained);
    else
    {
      list.add(queue.take());
      drained++;
    }
  }
  while (drained < min);
}
于 2012-06-07T10:08:27.463 回答
2

我认为唯一的方法是扩展某些实现BlockingQueue或使用以下方法创建某种实用程序方法take

public <E> void take(BlockingQueue<E> queue, List<E> to, int max) 
        throws InterruptedException {

    for (int i = 0; i < max; i++)
        to.add(queue.take());
}
于 2012-06-07T10:16:24.610 回答
1

我不确定标准库中是否有类似的类具有take(int n)类型方法,但是您应该能够包装默认值BlockingQueue以添加该函数而无需太多麻烦,您不觉得吗?

替代方案是触发将元素放入集合中的操作,您设置的阈值将触发刷新。

于 2012-06-07T10:08:22.050 回答
1

所以这应该是一个线程安全的队列,可以让你阻止获取任意数量的元素。欢迎更多的眼睛来验证线程代码是否正确。

package mybq;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

public class ChunkyBlockingQueue<T> {
    protected final LinkedList<T> q = new LinkedList<T>();
    protected final Object lock = new Object();

    public void add(T t) {
        synchronized (lock) {
            q.add(t);
            lock.notifyAll();
        }
    }

    public List<T> take(int numElements) {
        synchronized (lock) {
            while (q.size() < numElements) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            ArrayList<T> l = new ArrayList<T>(numElements);
            l.addAll(q.subList(0, numElements));
            q.subList(0, numElements).clear();
            return l;
        }
    }
}
于 2012-06-07T10:18:19.667 回答