0

我正在使用LinkedBlockingQueue它被多个线程填充,项目数量很大(数千万个对象)。

LinkedBlockingQueue.take()花费大量时间(通过分析器检查) - 56% 的时间。
队列永远是空的!!

什么会影响take()方法的性能?

更新:我对处理 take() 结果的代码进行了一些更改,我还将 take() 放到另一个线程中,性能几乎提高了 50%。

我不明白这是怎么可能的,因为我没有改变推杆的任何逻辑......

更新:

在调用 take() 之前,我计算了队列已满的次数:
使用原始代码,90% 的调用队列已满。
使用改进的代码 13% 的呼叫队列已满。

4

2 回答 2

5

如果我们查看这里的代码,我们可以看到在获取元素之前必须获取锁。如果有很多线程正在占用,那么这个锁就会发生争用——线程不是在等待某些东西出现,而是在等待其他线程来占用。

 public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            try {
                while (count.get() == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to a non-interrupted thread
                throw ie;
            }

            x = extract();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

编辑

如果您有一个接受者和很多推杆并且队列不断满员,则signalNotFull()此代码将给出瓶颈

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

这需要用 来putLock表示队列中现在有空间的事实。

于 2013-06-19T14:29:47.623 回答
5

take() 的重量相当轻,但如果你调用它足够多,它会消耗大量的 CPU。听起来您正在传递大量对象,而消费者只需很少的工作。我建议尝试重组您的问题,以便更有效地完成。

例如,您可以

  • 使用专为此类问题设计的 Disruptor。
  • 发送数据块/批次,例如对象列表而不是许多单个对象。
  • 使用多个交换器(适合少数作者)
  • 如果您的目标是持久化数据,请使用 Chronicle 之类的东西。

您的个人资料也可能不完全准确。当您测量一小段时间时,它可能会产生不同的结果。

顺便说一句:take() 是单线程的。如果您有许多线程试图同时调用 take(),它们将相互阻塞。

于 2013-06-19T14:29:24.387 回答