5

在一个大容量的多线程 java 项目中,我需要实现一个非阻塞缓冲区。

在我的场景中,我有一个每秒接收约 20,000 个请求的 Web 层。我需要在某些数据结构(也就是所需的缓冲区)中累积其中的一些请求,并且当它已满时(假设它包含 1000 个对象时已满)这些对象应该被序列化为一个文件,该文件将被发送到另一台服务器用于进一步处理。

实现应该是非阻塞的。我检查了ConcurrentLinkedQueue但我不确定它是否适合这项工作。

我认为我需要使用 2 个队列,一旦第一个队列被填满,它就会被一个新队列替换,并且整个队列(“第一个”)被传递以进行进一步处理。这是我目前正在考虑的基本想法,但我仍然不知道它是否可行,因为我不确定我是否可以在 java 中切换指针(以便切换完整队列)。

有什么建议吗?

谢谢

4

5 回答 5

4

我通常对这样的需求做的是在应用程序启动时创建一个缓冲区池并将引用存储在 BlockingQueue 中。生产者线程弹出缓冲区,填充它们,然后将引用推送到消费者正在等待的另一个队列。当消费者/s 完成后,(在您的情况下,数据写入正常),引用被推回池队列以供重用。这提供了大量的缓冲区存储,不需要在锁内进行昂贵的批量复制,消除 GC 操作,提供流控制,(如果池为空,生产者被迫等待直到返回一些缓冲区),并防止内存失控,多合一设计。

更多:多年来,我也在其他各种语言(C++、Delphi)中使用了这样的设计,而且效果很好。我有一个包含 BlockingQueue 的“ObjectPool”类和一个用于派生缓冲区的“PooledObject”类。PooledObject 对其池有一个内部私有引用(它在创建池时被初始化),因此允许使用无参数的 release() 方法。这意味着,在具有多个池的复杂设计中,缓冲区总是被释放到正确的池中,从而减少了潜在的问题。

我的大多数应用程序都有一个 GUI,所以我通常每秒将池级别转储到计时器上的状态栏。然后我可以大致看到有多少负载,如果任何缓冲区泄漏,(数量持续下降,然后应用程序最终在空池上死锁),或者我正在双重释放,(数量持续上升并且应用程序最终崩溃)。

在运行时更改缓冲区的数量也相当容易,通过创建更多并将它们推入池中,或者通过等待池,删除缓冲区并让 GC 销毁它们。

于 2013-10-31T09:43:10.957 回答
3

我认为您的解决方案有一个很好的观点。您将需要两个队列,这processingQueue将是您想要的缓冲区大小(在您的示例中为 1000),而waitingQueue会更大。每次processingQueue满时,它会将其内容放入指定的文件中,然后从 中获取前 1000 个waitingQueue(如果等待队列少于 1000,则获取更少)。

我对此唯一担心的是你提到每秒 20000 和 1000 的缓冲区。我知道 1000 是一个例子,但如果你不让它更大,那可能只是你将问题转移到waitingQueue而不是解决它,因为您waitingQueue将比processingQueue处理它们更快地收到 1000 个新的,从而在waitingQueue.

于 2013-10-31T09:03:47.103 回答
1

我可能弄错了,但您可以使用 an ArrayList,因为您不需要从队列中轮询每个元素。当数组的大小达到限制并且您需要发送它时,您只需在同步部分中刷新(创建副本并清除)您的数组。添加到这个列表也应该同步到这个刷新操作。

交换你的数组可能不安全——如果你的发送速度比你的生成速度慢,缓冲区可能很快就会开始相互覆盖。每秒 20000 个元素的数组分配对于 GC 来说几乎是微不足道的。

Object lock  = new Object();

List list = ...;

synchronized(lock){
    list.add();
}

...

// this check outside is a quick dirty check for performance, 
// it's not valid out of the sync block
// this first check is less than nano-second and will filter out 99.9%
// `synchronized(lock)` sections
if(list.size() > 1000){
  synchronized(lock){  // this should be less than a microsecond
     if(list.size() > 1000){  // this one is valid
       // make sure this is async (i.e. saved in a separate thread) or <1ms
       // new array allocation must be the slowest part here
       sendAsyncInASeparateThread(new ArrayList(list)); 
       list.clear();
     }
  }
}

更新

考虑到发送是异步的,这里最慢的部分new ArrayList(list)应该是 1000 个元素的大约 1 微秒和每秒 20 微秒。我没有测量,我从大约 1 毫秒内分配 100 万个元素的比例解决了这个问题。

如果你仍然需要一个超快的同步队列,你可能想看看MentaQueue

于 2013-10-31T09:32:55.923 回答
1

不是将每个请求对象放入队列,而是分配一个大小为 1000 的数组,当它被填满时,将该数组放入队列中,发送到发送者线程,该线程序列化并发送整个数组。然后分配另一个数组。

当发件人无法足够快地工作并且其队列已溢出时,您将如何处理这种情况?为避免内存不足错误,请使用大小有限的队列。

于 2013-10-31T10:10:12.170 回答
0

“切换指针”是什么意思?Java 中没有指针(除非您在谈论引用)。

无论如何,正如您可能从 Javadoc 中看到的那样,ConcurrentLinkedQueuesize() 方法存在“问题”。不过,您可以使用您最初的想法,即 2 个(或更多)缓冲区会被切换。磁盘 I/O 可能会出现一些瓶颈。也许 size() 的非常量时间在这里也不会成为问题。

当然,如果您希望它是非阻塞的,您最好拥有大量内存和快速磁盘(以及大/更大的缓冲区)。

于 2013-10-31T09:04:33.867 回答