0

我在一个目录中有一个可变的文件列表,并且我在 Java 中有不同的线程来处理它们。线程是可变的,取决于当前的处理器

int numberOfThreads=Runtime.getRuntime().availableProcessors();

File[] inputFilesArr=currentDirectory.listFiles();

如何跨线程统一拆分文件?如果我做简单的数学,比如

int filesPerThread=inputFilesArr.length/numberOfThreads

inputFilesArr.length那么如果和numberOfThreads不能完全被彼此整除,我可能最终会丢失一些文件。什么是这样做的有效方法,以便所有线程的分区和负载是统一的?

4

5 回答 5

2

这是对这个问题的另一种看法:

  1. 使用 java 的ThreaPoolExecutor. 这是一个例子
  2. 它的工作原理是线程池您不需要每次都创建线程,而是在开始时创建指定数量的线程并使用池中的线程
  3. 想法是将目录中每个文件的处理视为独立的任务,由每个线程执行。
  4. 现在,当您将所有任务提交给循环执行器时(这确保没有遗漏任何文件)。
  5. Executor 实际上会将所有这些任务添加到队列中,同时它会从线程池中提取线程并将任务分配给它们,直到所有线程都忙。
  6. 它一直等到一个线程可用。所以在这里配置线程池大小是至关重要的。您可以拥有与文件数量一样多的线程,也可以拥有比该数量更少的线程。

在这里,我假设要处理的每个文件都是相互独立的,并且不需要单个线程处理特定的一堆文件。

于 2012-12-26T06:45:54.530 回答
1

您可以使用循环算法获得最佳分布。这是伪代码:

ProcessThread t[] = new ProcessThread[Number of Cores];
int i = 0;
foreach(File f in files)
{
    t[i++ % t.length].queueForProcessing(f);
}

foreach(Thread tt in t)
{
    tt.join();
}
于 2012-12-26T05:35:03.893 回答
1

生产者消费者模式将优雅地解决这个问题。让一个生产者(主线程)将所有文件放在一个绑定的阻塞队列中(参见BlockingQueue)。然后让一些工作线程从队列中取出一个文件并处理它。

工作(而不是文件)将均匀分布在线程上,因为处理完一个文件的线程会要求处理下一个文件。这避免了一个线程只分配大文件来处理,而其他线程只分配小文件来处理的可能问题。

于 2012-12-26T06:35:08.517 回答
0

您可以尝试获取每个线程的文件范围(inputFilesArr 中的开始和结束索引):

if (inputFilesArr.length < numberOfThreads)
        numberOfThreads = inputFilesArr.length;

int[][] filesRangePerThread = getFilesRangePerThread(inputFilesArr.length, numberOfThreads);

private static int[][] getFilesRangePerThread(int filesCount, int threadsCount)
{
    int[][] filesRangePerThread = new int[threadsCount][2];

    if (threadsCount > 1)
    {
        float odtRangeIncrementFactor = (float) filesCount / threadsCount;
        float lastEndIndexSet = odtRangeIncrementFactor - 1;
        int rangeStartIndex = 0;
        int rangeEndIndex = Math.round(lastEndIndexSet);

        filesRangePerThread[0] = new int[] { rangeStartIndex, rangeEndIndex };

        for (int processCounter = 1; processCounter < threadsCount; processCounter++)
        {
            rangeStartIndex = rangeEndIndex + 1;
            lastEndIndexSet += odtRangeIncrementFactor;
            rangeEndIndex = Math.round(lastEndIndexSet);
            filesRangePerThread[processCounter] = new int[] { rangeStartIndex, rangeEndIndex };
        }
    }
    else
    {
        filesRangePerThread[0] = new int[] { 0, filesCount - 1 };
    }

    return filesRangePerThread;
}
于 2012-12-26T05:53:18.150 回答
0

如果您正在处理 I/O,即使使用一个处理器,多个线程也可以并行工作,因为当一个线程正在等待 read(byte[]) 时,处理器可以运行另一个线程。

无论如何,这是我的解决方案

int nThreads = 2;
File[] files = new File[9];
int filesPerThread = files.length / nThreads;

class Task extends Thread {
    List<File> list = new ArrayList<>();
                // implement run here
}

Task task = new Task();
List<Task> tasks = new ArrayList<>();
tasks.add(task);
for (int i = 0; i < files.length; i++) {
    if (task.list.size() == filesPerThread && files.length - i >= filesPerThread) {
        task = new Task();
        tasks.add(task);
    }
    task.list.add(files[i]);
}
for(Task t : tasks) {
    System.out.println(t.list.size());
}

印刷 4 5

请注意,如果您有 3 个文件和 5 个处理器,它将创建 3 个线程

于 2012-12-26T06:47:21.230 回答