3

我对并行性和并发性很陌生,我正在尝试使用 Java 中的 Fork-Join 实现中值过滤算法。基本上,我将输入文件读入 ArrayList 并使用该列表生成过滤中位数的新 ArrayList(包括原始 ArrayList 的第一个和最后一个元素)。

现在我设法制作了算法的串行/顺序版本,它工作正常。但是,当我尝试制作 Fork-Join 版本时,它似乎不适用于大型 ArrayLists(100000+)。我用一个大小为 5 的非常小的 ArrayList 进行了尝试,它工作正常。我似乎无法找到我的错误(我确定这是一个逻辑错误和/或实现错误)。任何帮助,将不胜感激。

这是顺序算法片段:

    //Add first boundary element to output ArrayList
    outputElements.add(this.elements.get(0));

    //Start Filter Algorithm 
    while(elements.size()-counter >= filterSize){
        for(int i = 0; i<filterSize; i++){
            tempElements.add(this.elements.get(i+counter));
            if(i==filterSize){
                break;
            }
        }

        Collections.sort(tempElements);
        outputElements.add(tempElements.get((filterSize-1)/2));

        counter++;
        tempElements.clear();
    }

    //Add last boundary element to output ArrayList.
    if (elements != null && !elements.isEmpty()) {
        outputElements.add(elements.get(elements.size()-1));
    }//End Filter Algorithm

这是我制作的并行课程。这是不起作用的部分:

public class Parallel extends RecursiveTask<List<Float>>{
    int lo;
    int hi;
    int filterSize;
    String outFile; //Output file name.
    int arraySize;
    List<Float> elements = new ArrayList<Float>();
    List<Float> tempElements = new ArrayList<Float>();
    List<Float> outputElements = new ArrayList<Float>();
    int counter = 0;
    static final int SEQUENTIAL_CUTOFF=1000;

    public Parallel(List<Float> elements, int filterSize, String outFile, int lo, int hi) {
        this.lo = lo;
        this.hi = hi;
        this.elements = elements;
        this.outFile = outFile;
        this.filterSize = filterSize;       
        if(lo == 0){
            outputElements.add(this.elements.get(0));
        }
    }
    @Override
    protected List<Float> compute() {
        long startTime = System.nanoTime(); //Algorithm starts here 
        if((hi-lo) < SEQUENTIAL_CUTOFF) {
            while(hi-counter >= filterSize){
                for(int i = lo; i<filterSize; i++){
                    tempElements.add(this.elements.get(i+counter));
                    if(i==filterSize){
                        break;
                    }
                }               
                Collections.sort(tempElements);
                outputElements.add(tempElements.get((filterSize-1)/2));
                counter++;
                tempElements.clear();
                return outputElements;
            }
          }else{              
              Parallel left = new Parallel(this.elements, this.filterSize, this.outFile, this.lo, ((this.lo + this.hi)/2));
              Parallel right = new Parallel(this.elements, this.filterSize, this.outFile, ((this.hi + this.lo)/2), this.hi);
              left.fork();

              List<Float> leftArr = new ArrayList<Float>();
              List<Float> rightArr = new ArrayList<Float>();

             rightArr =  right.compute();
             leftArr = left.join();

             List<Float> newList = new ArrayList<Float>();
             newList.addAll(leftArr);
             newList.addAll(rightArr);       

          }
        long endTime = System.nanoTime();//Algorithm ends here.

        //Write elements to output file 
        PrintWriter writeOutput = null;
        try {
            writeOutput = new PrintWriter(this.outFile, "UTF-8");
        } catch (FileNotFoundException | UnsupportedEncodingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        writeOutput.println(outputElements.size());//Number of lines
        for(int i=0; i<outputElements.size();i++){
            writeOutput.println(i+1 + " " + outputElements.get(i)); //Each line is written
        }

        writeOutput.close(); //Close when output finished writing.
        System.out.println("Parallel complete");
        return null;
    }
}

非常感谢任何帮助。在花了几个小时并围绕 SO 和 Google 进行了大量研究后,我无法做到这一点。

编辑:musical_coder 建议发布我面临的错误,它们就在这里。这是很多错误:

Exception in thread "main" java.lang.IndexOutOfBoundsException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:536)
    at java.util.concurrent.ForkJoinTask.reportResult(ForkJoinTask.java:596)
    at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:640)
    at java.util.concurrent.ForkJoinPool.invoke(ForkJoinPool.java:1521)
    at main.main(main.java:45)
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 0
    at java.util.ArrayList.rangeCheck(ArrayList.java:635)
    at java.util.ArrayList.get(ArrayList.java:411)
    at Parallel.compute(Parallel.java:44)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:1)
    at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:93)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334)
    at java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604)
    at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784)
    at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398)
4

1 回答 1

0

一般来说,你应该避免ArrayList在多线程代码中使用 s,因为它不是线程安全的:

请注意,此实现不同步。如果多个线程ArrayList同时访问一个实例,并且至少有一个线程在结构上修改了列表,则必须在外部进行同步。

我在您发布的片段中看不到任何同时修改列表的内容,但我确实看到您传递this.elements给子Parallel实例,这意味着您至少在做一些有风险的事情(在它们之间共享指向可变、非线程安全对象的指针线程)。

作为第一遍,this.elements = elements;在您的Parallel构造函数中替换为以下内容:

this.elements = Collections.unmodifiableList(elements);

通过使列表不可修改,您将确保如果您的Parallel代码试图改变列表,您将在故障点得到一个明确的错误。这不会阻止其他东西,除了Parallel修改原始elements列表之外,但它是一种快速、简单的验证Parallel行为是否正确的方法。如果你得到一个UnsupportedOperationException,你的Parallel类将需要重新设计——你不能ArrayList同时修改一个。

如果您没有得到UnsupportedOperationException,则其他东西正在修改您的列表。您需要找到并删除它。


一旦你弄清楚是什么导致你的列表同时发生变化,你就可以尝试确定最好的前进方式。通过所有“正确”的方式在线程之间共享数据超出了我希望在这个答案中涵盖的范围,但这里有一些一般的经验法则:

  • 避免可变数据结构- 将您的Parallel类设计为仅处理来自不可变数据结构的数据,例如Guava 的ImmutableList. 不可变数据结构默认是线程安全的。
  • 使用线程安全的数据结构——例如,ConcurrentLinkedQueue是多个进程读取和写入同一数据结构的线程安全方式。 ConcurrentHashMap是另一个常用的类。你需要什么取决于你想要做什么,但这些都是很好的起点。
  • 最小化并发操作的范围——即使是并发数据结构,你的总体目标应该是让每个任务独立运行,除了从共享源读取和写入共享接收器。对仅对一个线程可见的对象做尽可能多的工作。
  • 同步- 我注意到在没有任何显式同步的情况下Parallel写入。outFile这是危险的,并且可能会引入问题(崩溃或更严重的数据损坏)。一次只能有一个线程写入文件。通过使用专用的文件写入线程或显式同步文件写入操作来做到这一点。
于 2015-09-11T23:28:13.487 回答