3

我有一个程序处理大量文件,对于每个文件,需要做两件事:首先,读取和处理文件的一部分,然后MyFileData存储结果。第一部分可以并行化,第二部分不能。

按顺序做所有事情都非常慢,因为 CPU 必须等待磁盘,然后它会工作一点,然后它发出另一个请求,然后再次等待......

我做了以下

class MyCallable implements Callable<MyFileData> {
    MyCallable(File file) {
        this.file = file;
    }
    public MyFileData call() {
        return someSlowOperation(file);
    }
    private final File file;
}

for (File f : files) futures.add(executorService.submit(new MyCallable(f)));
for (Future<MyFileData> f : futures) sequentialOperation(f.get());

它帮助很大。但是,我想改进两点:

  • sequentialOperation固定顺序执行,而不是首先处理任何可用的结果。我怎样才能改变它?

  • 有数千个文件需要处理,启动数千个磁盘请求可能会导致磁盘垃圾。通过使用Executors.newFixedThreadPool(10)我限制了这个数字,但是我正在寻找更好的东西。理想情况下,它应该是自调整的,以便在不同的计算机上工作得最佳(例如,当RAID和/或NCQ可用时发出更多请求等)。我不认为它可以基于找出硬件配置,但测量处理速度并基于它进行优化应该可能的。任何的想法?

4

2 回答 2

6

顺序操作以固定顺序执行,而不是先处理任何可用的结果。我怎样才能改变它?

这正是CompletionService所做的:它并行处理任务并在它们完成时返回它们,而不管提交顺序如何。

简化(未测试)示例:

int NUM_THREADS = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
CompletionService<MyFileData> completionService = new ExecutorCompletionService<MyFileData>(executor);

for (File f : files) futures.add(completionService.submit(new MyCallable(f)));

for(int i = 0; i < futures.size(); i++) {
    Future<MyFileData> next = completionService.take();
    sequentialOperation(next.get());
}

有数千个文件需要处理,启动数千个磁盘请求可能会导致磁盘垃圾。通过使用 Executors.newFixedThreadPool(10) 我限制了这个数字,但是我正在寻找更好的东西。

我不是 100% 确定那个。我想这取决于你有多少磁盘,但我认为磁盘访问部分不应该分成太多线程(每个磁盘一个线程可能是明智的):如果多个线程同时访问一个磁盘,它会花更多的时间寻找而不是阅读。

于 2012-07-20T11:34:02.337 回答
2

顺序操作以固定顺序执行,而不是先处理任何可用的结果。我怎样才能改变它?

假设:每个someSlowOperation(file);呼叫都将花费可变的时间,因此,您希望在MyFileData收到它后立即处理,但不要与另一个同时处理sequentialOperation

您可以通过设置生产者/消费者队列来实现这一点。

生产者是callables您在示例中执行的,添加了一些位,您可以将结果添加到等待处理的工作队列中。

消费者是sequentialOperation()调用 - 它在自己的线程中运行,并且只有一个。这个线程所做的只是获取队列的头部,并处理它,重复直到程序结束。

这样,您可以最大限度地利用机器上的所有资源。

带有一些示例代码的相关帖子:Producer/Consumer threads using a Queue

编辑:我想你可能想要一个快速的样本,因为它对以前从未做过的人来说非常不透明

public class Main {

    private final ExecutorService producerExecutor = Executors.newFixedThreadPool(10);
    private final ExecutorService consumerExecutor = Executors.newFixedThreadPool(1);
    private final LinkedBlockingQueue<MyData> queue = new LinkedBlockingQueue();//or some other impl

    abstract class Producer implements Runnable{
        private final File file;
        Producer(File file) {
            this.file = file;
        }

        public void run() {
            MyData result = someLongAssOperation(file);
            queue.offer(result);
        }

        public abstract void someLongAssOperation(File file);
    }

    abstract class Consumer implements Runnable {
        public void run() {
            while (true) {
                sequentialOperation(queue.take());  
            }
        }

        public abstract void sequentialOperation(MyData data);
    } 

    private void start() {
        consumerExecutor.submit(new Consumer(){
            //implement sequentialOperation here
        });

        for (File f : files) {
            producerExecutor.submit(new Producer(file) {
                //implement the someLongAssOperation()
            });
        }

    }

    public static void main(String[] args) {
        new Main().start();     
    } 

}
于 2012-07-20T12:00:09.347 回答