3

我正在尝试使用线程对文件进行排序。这是 Sort.java :

这个函数在线程的帮助下排序

public static String[] threadedSort(File[] files) throws IOException {
      String sortedData[] = new String[0]; 
      int counter = 0; 
      boolean allThreadsTerminated = false;
      SortingThread[] threadList = new SortingThread[files.length];
      for (File file : files) {
          String[] data = getData(file);
          threadList[counter] = new SortingThread(data);
          threadList[counter].start();
          counter++;
      }
      while(!allThreadsTerminated) {
          allThreadsTerminated = true;
          for(counter=0; counter<files.length; counter++) {
              if(threadList[counter].getState() != Thread.State.TERMINATED) {
                  allThreadsTerminated = false;               
              }           
          }
      }
      for(counter=0; counter<files.length; counter++) {
          sortedData = MergeSort.merge(sortedData, threadList[counter].data);
      }
      return sortedData;
 }

此功能正常排序

  public static String[] sort(File[] files) throws IOException {
    String[] sortedData = new String[0];
    for (File file : files) {
      String[] data = getData(file);
      data = MergeSort.mergeSort(data);
      sortedData = MergeSort.merge(sortedData, data);
    }
    return sortedData;
  }

现在,当我使用两种方式进行排序时,正常排序比线程版本更快。这可能是什么原因?我错过了什么吗?

我的 SortingThread 是这样的:

public class SortingThread extends Thread {
    String[] data;
    SortingThread(String[] data) {
        this.data = data;
    }
    public void run() {
         data = MergeSort.mergeSort(data);        
    }  
}

当我通过将其性能与原始非线程实现进行比较来分析我的线程实现时,我发现第二个更快。这种行为的原因是什么?如果我们谈论相对性能改进,我们期望线程实现更快,如果没有错的话。

编辑:假设我有正常功能的 MergeSort。但是在这里发布它的代码是没有用的。getData() 函数也只是从文件中获取输入。我认为问题在于将整个文件放入数组中。我认为我应该为不同的线程提供不同的行:

private static String[] getData(File file) throws IOException {
    ArrayList<String> data = new ArrayList<String>();
    BufferedReader in = new BufferedReader(new FileReader(file));
    while (true) {
      String line = in.readLine();
      if (line == null) {
        break;
      }
      else {
        data.add(line);
      }
    }


    in.close();
    return data.toArray(new String[0]);
  }
4

4 回答 4

1

首先,你如何测量经过的时间?您是否在同一个程序中执行这两个测试?如果是这样,请记住,在执行第一个测试时,mergesort 可能会进行 Hotspot 编译。我建议您将每种方法运行两次,在第二次运行时测量时间

于 2015-06-07T07:59:21.863 回答
0

你有多少 CPU/内核?这段代码的一个问题是主线程在“while(!allThreadsT​​erminated)”循环中花费 CPU 时间,主动检查线程状态。如果你有一个 CPU - 你是在浪费它,而不是进行实际的排序。

将while循环替换为:

 for(counter=0; counter<files.length; counter++) {
        threadList[counter].join();
 }
于 2015-06-07T08:06:53.793 回答
0

您应该使用 Stream 和标准排序:

static String[] sort(File[] files, boolean parallel) {
    return (parallel ? Stream.of(files).parallel() : Stream.of(files))
        .flatMap(f -> {
            try {
                return Files.lines(f.toPath());
            } catch (Exception e) {
                e.printStackTrace();
                return null;
            }
        })
        .sorted()
        .toArray(String[]::new);
}

static String[] sort(File[] files) {
    return sort(files, false);
}

static String[] threadSort(File[] files) {
    return sort(files, true);
}

在我的环境threadSort中更快。

sort:
files=511 sorted lines=104419 elapse=4784ms
threadSort:
files=511 sorted lines=104419 elapse=3060ms
于 2015-06-07T08:18:16.660 回答
0

您可以使用java.util.concurrent.ExecutorServicewhich 将在指定数量的线程中运行所有任务,一旦所有线程完成执行,您将获得一个列表Future对象,该对象将保存每个线程执行的结果。Future 对象列表的顺序与您将 Callable 对象插入其列表中的顺序相同。

首先你需要有你的SortingThread实现Callable接口,这样你就可以得到每个线程执行的结果。
每个Callable对象都必须实现该call()方法,其返回类型将是您的Future对象。

    public class SortingThread implements Callable<String[]> {
    String[] data;
    SortingThread(String[] data) {
        this.data = data;
    }
    @Override
    public String[] call() throws Exception {
        data = MergeSort.mergeSort(data);
        return data;
    }  
   }

接下来你需要的是ExecutorSerivce用于线程管理。

public static String[] sortingExampleWithMultiThreads(File[] files) throws IOException {
      String sortedData[] = new String[0]; 
      int counter = 0; 
      boolean allThreadsTerminated = false;
      SortingThread[] threadList = new SortingThread[files.length];
      ArrayList<Callable<String[]>> callableList = new ArrayList<Callable<String[]>>();
      for (File file : files) {
          String[] data = getData(file);
          callableList.add(new SortingThread(data));  //Prepare a Callable list which would be passed to invokeAll() method.
          counter++;
      }

      ExecutorService service = Executors.newFixedThreadPool(counter);  // Create a fixed size thread pool, one thread for each file processing...
      List<Future<String[]>> futureObjects = service.invokeAll(callableList);  //List of what call() method of SortingThread is returning...

      for(counter=0; counter<files.length; counter++) {
          sortedData = MergeSort.merge(sortedData, futureObjects.get(counter));
      }
      return sortedData;
 }

这样你就可以避免使用已知会增加 CPU 利用率(因此速度降低)的 WHILE 循环,如果你有单核 CPU,那么它可以达到 100% 的利用率,如果是双核则可以达到 50%。
此外,ExecutorService在处理多线程而不是开发者启动和监视线程以获取结果时,使用线程管理是更好的方法。因此,您可以期待性能。

我还没有运行它,所以你可能需要在这里和那里进行更改,但我已经强调了你的方法。

PS:在测量性能时,为了得到整洁和精确的结果,每次运行时总是创建一个新的 JVM 实例。

于 2015-06-07T09:48:03.667 回答