我有各种不同的算法,每种算法都需要大量资源,并且每种算法都必须在数百万个输入上进行处理。我想将输入分成块,然后并行处理这些块,最后将结果以正确的顺序组装到一个输出数组中。
我一直在研究这件事,共识似乎是我应该使用ExecutorService
and arraycopy()
。但是,我不确定如何确定要创建的最佳线程数,也不知道如何以消除错误风险的方式构造代码。如果我知道每个线程在创建其结果数组后被终止,那就太好了。最后,我在下面编写的代码也给了我一个空指针错误。
你们中的一些人能否编辑下面的代码,以便它尽快实现我的上述目标,同时消除错误的风险?如果下面的代码可以在 5 或 10 毫秒内运行,那就太好了。数组中的随机数只不过是作为比较线程选项的基准的占位符。我不需要优化随机数生成,因为我的实际算法与随机数生成无关。这是我正在进行的工作:
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ParallelArrays {
private static final int numJobs = 5;
static int numElements = 1473200;
static int blockSize = 300000;
static int remaining;
String number;
static int[][] data2D = new int[numJobs][];
static int[] data1D;
static int size;
static int currIdx;
public static void main(String args[]) {
long startTime = System.currentTimeMillis();
remaining = numElements-blockSize;
// create a pool of threads, 10 max jobs will execute in parallel
ExecutorService threadPool = Executors.newFixedThreadPool(10);
// submit jobs to be executing by the pool
for (int i = 0; i < numJobs; i++) {
currIdx = i;
System.out.println("This coming iteration would leave us with remaining, blockSize: "+remaining+", "+blockSize);
if(remaining>=0){System.out.println("blockSize is: "+blockSize);}
else{
blockSize = (blockSize+remaining);
remaining = 0;
System.out.println("else blockSize is: "+blockSize);
}
System.out.println("After iteration, remaining, blockSize are: "+remaining+", "+blockSize);
threadPool.submit(new Runnable() {
public void run() {
Random r = new Random();
data2D[currIdx] = new int[blockSize];
for(int j=0;j<data2D[currIdx].length;j++){
data2D[currIdx][j] = r.nextInt(255)*r.nextInt(255)*r.nextInt(255);
}
}
});
remaining -= blockSize;
}
//Now collapse data2D into a 1D array
data1D = new int[numElements];
int startPos = 0;
for(int k=0;k<numJobs;k++){
System.out.println("startPos is: "+startPos);
//arraycopy(Object src, int srcPos, Object dest, int destPos, int length);
System.out.println("k is: "+k);
System.out.println("data2D[k].length is: "+data2D[k].length);
System.arraycopy(data2D[k], 0, data1D, startPos, data2D[k].length);
startPos += data2D[k].length;
}
threadPool.shutdown();
System.out.println("Main thread exiting.");
long endTime = System.currentTimeMillis();
System.out.println("Elapsed time is: "+(endTime-startTime));
}
}
第二次编辑:
为了响应 Ralf H 的建议,我将我的代码编辑如下。它仍然抛出相同的空指针异常,我将在下面再次包含它。我非常感谢重写此代码以使其正确运行而不会引发空指针异常的任何帮助:
package myPackage;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ParallelArrays {
private final static int numJobs = 5;
static int numElements = 1473200;
static int blockSize = 300000;
static int remaining;
String number;
static int[][] data2D = new int[numJobs][];
static int[] data1D;
// static int size;
static int currIdx;
static int numAdded = 0;
public static void main(String args[]) {runAlgorithm();}
static void runAlgorithm(){
long startTime = System.currentTimeMillis();
remaining = numElements-blockSize;
ExecutorService threadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < numJobs; i++) {// submit jobs to be executing by the pool
currIdx = i;
if(remaining<0){//last block will be smaller than the rest
blockSize = (blockSize+remaining);
remaining = 0;
}
final int fCurrIdx = i;
threadPool.submit(new Runnable() {
public void run() {
Random r = new Random();
data2D[fCurrIdx] = new int[blockSize];
System.out.println("fCurrIdx is: "+fCurrIdx);
for(int j=0;j<data2D[fCurrIdx].length;j++){
data2D[fCurrIdx][j] = r.nextInt(255)*r.nextInt(255)*r.nextInt(255);
}
numAdded += 1;
}
});
remaining -= blockSize;
}
//Now collapse data2D into a 1D array
data1D = new int[numElements];
System.out.println("numAdded, data2D.length is: "+numAdded+", "+data2D.length);
int startPos = 0;
for(int k=0;k<numJobs;k++){
System.out.println("startPos is: "+startPos);
//arraycopy(Object src, int srcPos, Object dest, int destPos, int length);
System.out.println("k, data2D["+k+"].length are: "+k+", "+data2D[k].length); // NullPointerException here
System.arraycopy(data2D[k], 0, data1D, startPos, data2D[k].length);
startPos += data2D[k].length;
}
threadPool.shutdown();
System.out.println("Main thread exiting.");
long endTime = System.currentTimeMillis();
System.out.println("Elapsed time is: "+(endTime-startTime));
}
}
这是修改后的代码引发的空指针错误的堆栈跟踪:
Exception in thread "main" java.lang.NullPointerException
at myPackage.ParallelArrays.runAlgorithm(ParallelArrays.java:52)
at myPackage.ParallelArrays.main(ParallelArrays.java:19)
我认为问题在于代码需要使用 Future 对象和ExecutorService
. 但我不确定这个特定代码的语法。