1

我是 Java 新手,正在尝试编写一种方法来查找 2D 长数组中的最大值。

该方法在单独的线程中搜索每一行,并且线程保持共享的当前最大值。每当一个线程发现一个大于它自己的局部最大值的值时,它就会将该值与共享的局部最大值进行比较,并酌情更新其当前的局部最大值和可能的共享最大值。我需要确保实现适当的同步,以便无论计算如何交错,结果都是正确的。

我的代码冗长而凌乱,但对于初学者来说,我有这个功能:

   static long sharedMaxOf2DArray(long[][] arr, int r){

     MyRunnableShared[] myRunnables = new MyRunnableShared[r];
     for(int row = 0; row < r; row++){
       MyRunnableShared rr = new MyRunnableShared(arr, row, r);
       Thread t = new Thread(rr);
       t.start();
       myRunnables[row] = rr;
     }

     return myRunnables[0].sharedMax; //should be the same as any other one (?)

   }

对于改编的可运行文件,我有这个:

   public static class MyRunnableShared implements Runnable{
     long[][] theArray; 
     private int row; 
     private long rowMax; 
     public long localMax; 
     public long sharedMax; 
     private static Lock sharedMaxLock = new ReentrantLock(); 
     MyRunnableShared(long[][] a, int r, int rm){
        theArray = a; 
        row = r;
        rowMax = rm;
      }
      public void run(){
        localMax = 0;
        for(int i = 0; i < rowMax; i++){
          if(theArray[row][i] > localMax){
            localMax = theArray[row][i];
            sharedMaxLock.lock();
            try{
              if(localMax > sharedMax)
                sharedMax = localMax;
            }
            finally{
              sharedMaxLock.unlock(); 
            }
          } 
        }
      }
    }

我认为这种锁的使用将是一种防止多个线程sharedMax一次搞乱的安全方法,但是在测试/比较同一输入上的非并发最大值查找函数时,我发现结果不正确. 我在想问题可能来自我刚才说的事实

...
t.start();
myRunnables[row] = rr; 
...

sharedMaxOf2DArray函数中。也许给定线程需要在我将它放入 myRunnables 数组之前完成;否则,我将“捕获”错误的 sharedMax?或者是别的什么?我不确定事情的时间安排..

4

4 回答 4

1

来自 JavaDocs:

公共接口可调用

返回结果并可能引发异常的任务。实现者定义了一个没有参数的方法,称为 call。

Callable 接口与 Runnable 类似,两者都是为实例可能由另一个线程执行的类设计的。但是,Runnable 不返回结果,也不能抛出检查异常。

好吧,您可以使用 Callable 从一个 1darray 计算结果,然后等待 ExecutorService 结束。您现在可以比较 Callable 的每个结果以获取最大值。代码可能如下所示:

Random random = new Random(System.nanoTime());
long[][] myArray = new long[5][5];
for (int i = 0; i < 5; i++) {
    myArray[i] = new long[5];
    for (int j = 0; j < 5; j++) {
        myArray[i][j] = random.nextLong();
    }
}

ExecutorService executor = Executors.newFixedThreadPool(myArray.length);
List<Future<Long>> myResults = new ArrayList<>();
// create a callable for each 1d array in the 2d array
    for (int i = 0; i < myArray.length; i++) {
        Callable<Long> callable = new SearchCallable(myArray[i]);
    Future<Long> callResult = executor.submit(callable);
    myResults.add(callResult);
}
// This will make the executor accept no new threads
// and finish all existing threads in the queue
executor.shutdown();
// Wait until all threads are finish
while (!executor.isTerminated()) {
}
// now compare the results and fetch the biggest one
long max = 0;
for (Future<Long> future : myResults) {
    try {
        max = Math.max(max, future.get());
    } catch (InterruptedException | ExecutionException e) {
        // something bad happend...!
        e.printStackTrace();
    }
}
System.out.println("The result is " + max);

你的可调用对象:

public class SearchCallable implements Callable<Long> {

    private final long[] mArray;

    public SearchCallable(final long[] pArray) {
        mArray = pArray;
    }

    @Override
    public Long call() throws Exception {
        long max = 0;
        for (int i = 0; i < mArray.length; i++) {
            max = Math.max(max, mArray[i]);
        }
        System.out.println("I've got the maximum " + max + ", and you guys?");
        return max;
    }

}
于 2012-12-06T07:14:26.457 回答
1

我不确定这是否是拼写错误,但您的Runnable实现声明sharedMax为实例变量:

public long sharedMax;

而不是共享的:

public static long sharedMax;

在前一种情况下,每个 Runnable 都有自己的副本,不会“看到”其他人的值。将其更改为后者应该会有所帮助。或者,将其更改为:

public long[] sharedMax; // array of size 1 shared across all threads

现在您可以在循环外创建一个大小为 1 的数组,并将其传递给每个 Runnable 以用作共享存储。

顺便说一句:请注意,由于每个线程都会通过为其循环的每次迭代持有一个锁来检查公共sharedMax值,因此会有巨大的锁争用。这可能会导致性能不佳。您必须进行测量,但我推测让每个线程找到最大行然后运行最后一遍以找到“最大值的最大值”实际上可能具有可比性或更快。

于 2012-12-06T07:39:06.697 回答
1

您的代码存在严重的锁争用和线程安全问题。更糟糕的是,它实际上并没有等待任何线程完成,return myRunnables[0].sharedMax这是一个非常糟糕的竞争条件。此外,通过ReentrantLock甚至synchronized块使用显式锁定通常是错误的做事方式,除非您正在实现一些低级别的东西(例如您自己的/新的并发数据结构)

这是一个使用Future并发原语和ExecutorService处理线程创建的版本。总体思路是:

  1. 提交多个并发作业到您的ExecutorService
  2. Future返回的 backed from添加submit(...)List
  3. 循环调用get()每个列表Future并聚合结果

这个版本还有一个额外的好处,即工作线程之间没有锁争用(或一般的锁定),因为每个线程只返回其数组切片的最大值。

import java.util.concurrent.*;
import java.util.*;

public class PMax {
    public static long pmax(final long[][] arr, int numThreads) {
        ExecutorService pool = Executors.newFixedThreadPool(numThreads);
        try {
            List<Future<Long>> list = new ArrayList<Future<Long>>();
            for(int i=0;i<arr.length;i++) {
                // put sub-array in a final so the inner class can see it:
                final long[] subArr = arr[i];
                list.add(pool.submit(new Callable<Long>() {
                    public Long call() {
                        long max = Long.MIN_VALUE;
                        for(int j=0;j<subArr.length;j++) {
                            if( subArr[j] > max ) {
                                max = subArr[j];
                            }
                        }
                        return max;
                    }
                }));
            }
            // find the max of each slice's max:
            long max = Long.MIN_VALUE;
            for(Future<Long> future : list) {
                long threadMax = future.get();
                System.out.println("threadMax: " + threadMax);
                if( threadMax > max ) {
                    max = threadMax;
                }
            }
            return max;
        } catch( RuntimeException e ) {
            throw e;
        } catch( Exception e ) {
            throw new RuntimeException(e);
        } finally {
            pool.shutdown();
        }
    }

    public static void main(String args[]) {
        int x = 1000;
        int y = 1000;
        long max = Long.MIN_VALUE;
        long[][] foo = new long[x][y];
        for(int i=0;i<x;i++) {
            for(int j=0;j<y;j++) {
                long r = (long)(Math.random() * 100000000);
                if( r > max ) {
                    // save this to compare against pmax:
                    max = r; 
                }
                foo[i][j] = r;
            }
        }
        int numThreads = 32;
        long pmax = pmax(foo, numThreads);
        System.out.println("max:  " + max);
        System.out.println("pmax: " + pmax);
    }
}

奖励:ExecutorService如果您重复调用此方法,那么将创建从方法中提取出来并在调用之间重用它可能是有意义的。

于 2013-05-02T17:21:20.253 回答
0

好吧,这绝对是一个问题——但如果没有更多代码,就很难理解它是否是唯一的东西。

在访问(以及对 的读取)和修改其他线程之间基本上存在竞争条件。thread[0]sharedMaxsharedMax

想想如果调度程序决定暂时不让任何线程运行会发生什么 - 所以当您完成创建线程时,您将返回答案而不修改它!(当然还有其他可能的情况……)

join()您可以通过在返回答案之前 ing 所有线程来克服它。

于 2012-12-06T05:55:14.113 回答