0

短版 - 我正在寻找将两个队列包装成一个以在线程池中使用的最有效方法,只要其中一个队列的内容不为空,然后回退到第二个队列。

长版:系统中有几个线程池,每个都有自己的队列。

“外部”请求来自外部,并被提交到其中一个池,在该池中处理一段时间,然后转移到另一个队列。在第一次传输之后,请求被视为“内部”,直到完成。请求可能会在所有线程池之间传输,并且可能会在它开始的那个线程池中结束。

我想要做的是,如果每个线程池的队列都包含内部请求,则应该首先处理它们,直到队列用完内部请求,然后它应该切换到外部请求。如果出现一个新的内部,那么接下来应该处理它。

不需要纯 FIFO,目前每个线程池使用一个 LinkedTransferQueue,因为当涉及到许多消费者和许多生产者时,这是我发现的最快的一种 FIFO 队列。非常高的插入率和 LinkedTransferQueue 中的自旋锁工作得非常好。

队列的大小在其他地方处理(拒绝也是如此),所以我真的只需要适当的 put/take 行为。

过去对解决方案的尝试没有成功:

  1. PriorityBlockingQueue - 太慢了。由于某种原因,锁很糟糕,尤其是在我的硬件上。(2.4 ghz 四核笔记本电脑几乎胜过 2x 2.0ghz 六核服务器)
  2. 手动包装两个 LinkedBlockingQueue - 工作正常,但也太慢了。使用 ONE LinkedBlockingQueue 也太慢了。
  3. 手动包装两个 ArrayBlockingQueue - 与 LinkedBlockingQueue 相同。

解决思路:

  • 手动“合并”两个 LinkedTransferQueues ......但是在查看了它的代码之后,恐怕我不够聪明,无法做到这一点而不会破坏某些东西。
  • 旋转一段时间,在两个队列上执行 peek() 直到我找到一些东西......这基本上是一个自旋锁,我想我可以这样做一段时间,但我想我想要与 LinkedTransferQueue 相同的行为,那是。旋转一会儿,然后让步或休眠一会儿。但我没有办法醒来。(编辑:如果我能避免“真正的”锁,我真的更喜欢)

不能只在 External 上执行 peek() 并在 Internal 上执行 if null 块,因为可能会将某些内容插入到 External 中,而我将被阻止等待 Internal。我永远不会从内部解除阻塞,因为除非我从外部处理它们,否则事情不会在内部结束......如果我将它旋转 180 度,同样的方式(除了它来自系统内部)

我为一个毫无意义的帖子道歉......

4

1 回答 1

0

尝试了另一种方法,一个队列提供另一个队列并在处理太多“内部”请求时阻塞......锁将性能降低了一半。

最后我这样做了:将两个 LinkedTransferQueues 放在一个实现 BlockingQueue 的类中。让“offer”方法决定它最终进入哪个队列,而 take 方法执行以下操作(不是确切的代码):

@Override
public Runnable take() throws InterruptedException {
    int c = 0;
    ThreadLocalRandom rand = ThreadLocalRandom.current();

    while (true) {
        // Check internal first
        Runnable r = internalQueue.poll();
        if (r != null) {
            return r;
        }

        // Spin for a while on external, with occasional yield.
        while(c < 128)
        {
            r = externalQueue.poll();
            if (r != null) {
                return r;
            }

            if(rand.nextInt(32) == 0)
            {
                Thread.yield();
            }

            c++;
        }

        // Use the transfer queues time limited poll.
        r = externalQueue.poll(10000, TimeUnit.NANOSECONDS);
        if (r != null) {
            return r;
        }
    }
}

信不信由你,这实际上将吞吐量提高了 15% 并将延迟减少了一半。较少的“开始但未完成”请求占用资源,并且队列中的争用可能会减少一点(由于我的使用场景)。

自旋锁的值和思想取自 LinkedTransferQueue 的代码。

不利的一面是,使用上述代码,系统空闲率为 11%,但可以通过在 30 秒不活动后增加轮询超时来轻松解决此问题。

如果有一天有人碰巧遇到一个更好的解决方案,仍然欢迎,但现在这可行。

于 2013-07-01T17:07:01.537 回答