6

Java 是否支持任何队列对象或机制来处理批处理?

例如:我们有一个队列(或任何想要的队列对象),一些生产者将项目一个一个地推入队列,我的目标是当我们在这个队列中有 10 个项目或超过 10 个项目时,我们可以触发一些处理程序来处理它在一批中。

或者它不是自动触发的,我们需要找到一种方法在处理程序端优雅地循环队列。

我们是否有典型的高性能对象或库来处理这个问题?

谢谢,埃姆雷

4

5 回答 5

2

Batch processing in Queue could be achievable with wait/notify, something like you would block thread call against the resource upto it is available or not.

public class MyQueue implements Queue<Object>{
        public synchronized List<Object> peek() {
        if(this.list.size()>=10)
                  this.list.wait();
        return Collections.subList(0,10);
    }
        @Override
    public boolean add(Object e) {
        this.list.add(e);
                if(this.list.size()>=10)
                  this.list.notifyAll(); 
        return false;
    }
}

it is not triggered automatically

In that case you can call wait with specified time out.

于 2012-10-08T09:17:44.133 回答
1

您可以使用BlockingQueue.drainTo()自动获取要执行的任务的批次。这适用于每秒超过 100K 的任务。

如果您需要更高性能的队列,您可以使用更复杂的DisruptorJava Chronicle,它们每秒可以排队数百万个任务,两者都支持自动批处理。

于 2012-10-08T09:29:13.050 回答
1

这是批量处理对象的快速尝试,使用后台线程收集和处理由其他线程推送到队列中的对象:

public abstract class Batcher<E> implements Runnable {

    public static interface BatchProcessor<E> {
        public void processBatch(List<E> batch);
    }

    private final BlockingQueue<E> queue;
    private final BatchProcessor<E> processor;

    private Batcher(BlockingQueue<E> queue, BatchProcessor<E> processor) {
        this.queue = queue;
        this.processor = processor;
    }

    @Override
    public void run() {
        try {
            while (true) {
                List<E> batch = new ArrayList<E>();
                for (int i = 0; i < 10; i++) {
                    batch.add(queue.take());
                }
                processor.processBatch(batch);
            }
        } catch (InterruptedException e) {
            return;
        }
    }

}

要使用它,您创建一个BlockingQueue并在其上放置对象,创建一个实现的实例BatchProcessor来处理批次,然后创建一个实例Batcher以将对象从前者泵送到后者。

于 2012-10-08T10:37:20.207 回答
0

查看 interface 的 API 文档java.util.Queue,它有几个实现。

还有一个标准 API,即Java 消息服务 (JMS),用于处理用于在不同进程之间交换消息的排队系统。

于 2012-10-08T08:55:36.063 回答
0

我认为CountDownLatch是您需要的,或者可能是CyclicBarrier。这将允许您设置一个同步点,该点将在发生一定数量的操作后触发消费者,并且您可以使用标准队列作为容器对象。

于 2012-10-08T09:00:58.917 回答