Java 是否支持任何队列对象或机制来处理批处理?
例如:我们有一个队列(或任何想要的队列对象),一些生产者将项目一个一个地推入队列,我的目标是当我们在这个队列中有 10 个项目或超过 10 个项目时,我们可以触发一些处理程序来处理它在一批中。
或者它不是自动触发的,我们需要找到一种方法在处理程序端优雅地循环队列。
我们是否有典型的高性能对象或库来处理这个问题?
谢谢,埃姆雷
Batch processing in Queue could be achievable with wait/notify, something like you would block thread call against the resource upto it is available or not.
public class MyQueue implements Queue<Object>{
public synchronized List<Object> peek() {
if(this.list.size()>=10)
this.list.wait();
return Collections.subList(0,10);
}
@Override
public boolean add(Object e) {
this.list.add(e);
if(this.list.size()>=10)
this.list.notifyAll();
return false;
}
}
it is not triggered automatically
In that case you can call wait with specified time out.
您可以使用BlockingQueue.drainTo()
自动获取要执行的任务的批次。这适用于每秒超过 100K 的任务。
如果您需要更高性能的队列,您可以使用更复杂的Disruptor或Java Chronicle,它们每秒可以排队数百万个任务,两者都支持自动批处理。
这是批量处理对象的快速尝试,使用后台线程收集和处理由其他线程推送到队列中的对象:
public abstract class Batcher<E> implements Runnable {
public static interface BatchProcessor<E> {
public void processBatch(List<E> batch);
}
private final BlockingQueue<E> queue;
private final BatchProcessor<E> processor;
private Batcher(BlockingQueue<E> queue, BatchProcessor<E> processor) {
this.queue = queue;
this.processor = processor;
}
@Override
public void run() {
try {
while (true) {
List<E> batch = new ArrayList<E>();
for (int i = 0; i < 10; i++) {
batch.add(queue.take());
}
processor.processBatch(batch);
}
} catch (InterruptedException e) {
return;
}
}
}
要使用它,您创建一个BlockingQueue
并在其上放置对象,创建一个实现的实例BatchProcessor
来处理批次,然后创建一个实例Batcher
以将对象从前者泵送到后者。
查看 interface 的 API 文档java.util.Queue
,它有几个实现。
还有一个标准 API,即Java 消息服务 (JMS),用于处理用于在不同进程之间交换消息的排队系统。
我认为CountDownLatch是您需要的,或者可能是CyclicBarrier。这将允许您设置一个同步点,该点将在发生一定数量的操作后触发消费者,并且您可以使用标准队列作为容器对象。