4

我有一个包含数千行的数据文件。我正在阅读它们并将它们保存在数据库中。我想以 50 行的批次对这个过程进行多线程处理。正如我在文件中所读到的,有 10 行被提交给 ExecutorService。

ExecutorService executor = Executors.newFixedThreadPool(5);`

我可以在一段时间循环中执行以下操作,直到我的行结束......

 Future<Integer> future = executor.submit(callableObjectThatSaves10RowsAtOneTime);

但是,如果处理 10 行需要时间,我不想将整个文件读入内存。我只想提交 5 个,直到其中一个线程返回,然后我提交下一个。

假设一个线程需要 20 秒来保存 10 条记录,我不希望它ExecutorService被喂上千行,因为读取过程正在继续读取并提交给 ExecutorService

实现这一目标的最佳方法是什么?

4

1 回答 1

3

您可以使用LinkedList<Future<?>>存储期货来执行此操作,直到您达到某个预定大小。下面是一些框架代码,应该可以帮助您完成大部分工作:

int threads = 5;
ExecutorService service = Executors.newFixedThreadPool(threads);
LinkedList<Future<?>> futures = new LinkedList<>();

//As long as there are rows to save:
while(moreRowsLeft()){
    //dump another callable onto the queue:
    futures.addLast(service.submit(new RowSavingCallable());

    //if the queue is "full", wait for the next one to finish before
    //reading any more of the file:
    while(futures.size() >= 2*threads) futures.removeFirst().get();
}

//All rows have been submitted but some may still be writing to the DB:
for(Future<?> f : futures) future.get();

//All rows have been saved at this point

您可能想知道为什么我允许期货数量达到机器上线程数量的两倍 - 这允许执行程序服务线程在主线程创建更多工作时处理数据库保存。这可以帮助隐藏与在工作线程忙于执行数据库写入时使更多可调用可用于处理相关的任何 I/O 成本。

于 2015-10-28T21:47:32.853 回答