2

我目前正在开发一个同时对字符串进行排序的程序。我的程序接收一个文件,将文件的每一行读入一个数组,然后将字符串数组拆分为更小的字符串数组。然后程序为每个较小的数组启动一个线程,并对它们进行快速排序。一旦每个线程完成对其数组的排序,主线程就会从线程对象中收集所有结果。然后应该将较小的、现已排序的数组合并为一个大的、已排序的数组。

我目前已经使用单线程合并排序将快速排序线程返回的排序数组嵌套在一起解决了这个问题。现在的问题是,由于合并不会同时发生,因此使用少量线程(1-4)对文件进行排序实际上使程序排序尽可能快。如果我稍微增加线程数(比如 15 个线程),程序实际上运行的速度要比使用更少线程的速度慢很多。为了解决这个问题,我希望在我的合并排序/数组嵌套中引入并发性。

我想做的是:一旦两个线程完成了对 in-file 的部分的快速排序,一个新线程会将这两个部分嵌套在一起,直到 in-file 的每个部分都已排序。

非常感谢每一点帮助,我感谢示例代码和/或伪代码。提前致谢!:)


对数组进行排序的当前代码:

public synchronized String[] sort(){
    String[] sortedWords = new String[words.length];
    SortingThread[] sts = new SortingThread[threads];

    for(int i = 0; i < threads; i++){
        sts[i] = new SortingThread(this, splitWords[i]);
    }

    for(SortingThread st : sts){
        st.start();
    }

    for(SortingThread st : sts){
        try {
            st.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.exit(-1);
        }
    }

    indexes = new int[sts.length];

    for(int i = 0; i < indexes.length; i++){
        indexes[i] = 0;
    }


//This is where my merge-sorting currently starts.

    ArrayList<String> toAddTo = new ArrayList<String>();

    while(!allIndexesHaveBeenRead(sts)){
        String globalMinimum = null;
        int globalMinThread = -1;
        currentIteration: for (int i = 0; i < sts.length; i++) {
            String current;
            try{
                current = sts[i].sorted[indexes[i]];
            } catch(Exception e){
                continue currentIteration;
            }
            try{
                if(globalMinimum == null){
                    globalMinimum = current;
                    globalMinThread = i;
                }
                else if(current.compareTo(globalMinimum) < 0){
                    globalMinimum = current;
                    globalMinThread = i;
                }
            } catch (NullPointerException e){
                continue;
            }
        }
        toAddTo.add(globalMinimum);
        indexes[globalMinThread]++;
    }

    sortedWords = toAddTo.toArray(sortedWords);

    int len = 0;
    for (int i = 0; i < sortedWords.length; i++) {
        if(sortedWords[i] != null){
            len++;
        }
    }

    String[] toReturn = new String[len];

    for (int i = 0; i < toReturn.length; i++) {
        toReturn[i] = sortedWords[i];
    }

    return toReturn;
}
4

2 回答 2

1

你的问题场景是这样的

  • 一个主线程需要完成 N 个任务
  • 它从一个池中产生 M 个线程并处理 N 个任务
  • 它等待至少一个线程完成任务并对结果进行处理
  • 继续处理结果,直到完成所有 N 个任务

Java 5 中的 CompletionService,它完全符合要求,

这是您的问题陈述的解决方案,

 public class Sorter implements Callable<List<String>> {

    private List<String> data;

public Sorter(List<String> input) {
    data = input;
}

@Override
public List<String> call() throws Exception {
    Collections.sort(data);
    return data;
}

 }

而在主课中,

  CompletionService service = new  ExecutorCompletionService(Executors.newFixedThreadPool(5));

    List<String> result = new ArrayList<String>();

    String readline = null;
    Callable<List<String>> sorter = null;
    String[] words = null;
    int noOfRunningFutures = 0;

     while ((readline = br.readLine()) != null) {
        words = readline.split(" ");
        List<String> input = Arrays.asList(words);
        sorter = new Sorter(input);

        service.submit(sorter);

        // add them to the number of futures which I am creating - to keep track of the Queue length
        noOfRunningFutures ++;
    }


    while (noOfRunningFutures > 0) 
    {
        try {

            // this is a blocking call - whenever there is a worker which is already completed
            // then it is fetched from the Queue                 
            Future<List<String>> completed = service.take();
            noOfRunningFutures --;

            // get the value from computed from the Future
            List<String> sorted =  completed.get();

            result.addAll(sorted);

            Collections.sort(result);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

希望这对您有所帮助。

于 2013-05-12T21:02:27.623 回答
1

我在 SourceForge 上管理一个 fork-join 项目 TymeacDSE,这正是您正在寻找的。它对子集进行排序,然后将子集的组合并到一个最终数组中。看看这里

于 2013-05-13T13:11:57.040 回答