-1

我正在尝试使用Java中的ForkJoinPool处理图像。我使用流对图像进行了一些自定义操作。我正在尝试将ForkJoinPool用于getRGBsetRGB方法。如何实现getRGB方法的并行性?

    @Override
    public int[] getRGB(int xStart, int yStart, int w, int h, int[] rgbArray,int offset, int scansize) {

        int[][] sol = new int[h][w];

        int threshold = w;

        class RecursiveSetter extends RecursiveAction {
            int from;
            int to;
            FJBufferedImage image;

            RecursiveSetter(int from, int to, FJBufferedImage image) {
                this.from = from;
                this.to = to;
                this.image = image;
            }

            @Override
            protected void compute() {
                System.out.println("From : " + from + " To : " + to);
                if (from >= to) return;

                if (to - from == 1) {
                    computeDirectly(from);
                    return;
                } else {
                    int mid = from + (to - from) / 2;
                    System.out.println("From : " + from + " To : " + to +
                            "Mid :" + mid);
                    invokeAll(
                            new RecursiveSetter(from, mid, image),
                            new RecursiveSetter(mid + 1, to, image));
                    return;
                }
            }

            void computeDirectly(int row) {
                sol[from] = image.getRealRGB(from, 0, w, 1, null, offset,
                        scansize);

            }
        }

        ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
        pool.invoke(new RecursiveSetter(0, h-1, this));
        return Arrays.stream(sol)
                .flatMapToInt(Arrays::stream)
                .toArray();
    }

getRealRGB方法的唯一代理BufferedImage。我知道这可能不切实际,但我只想知道如何在这种情况下使用ForkJoinPool 。是的,上面的代码正在抛出ArrayIndexOutOfBound异常。请就如何拆分工作负载(行与列与小网格。现在,我正在按行拆分)以及如何确定阈值提出建议。

4

1 回答 1

3

首先是关于您的尝试的一些评论:

int[][] sol = new int[h][w];

在这里,您正在创建一个二维数组,它在 Java 中是一个一维数组,其元素类型int[]已经填充了该int[]类型的子数组。由于您将使用 覆盖元素,因此sol[from] = /* something returning an int[] array */分配这些子数组已过时。所以在这种情况下,你应该使用

int[][] sol = new int[h][];

反而。但是认识到外部数组的一维性质也可以让我们认识到一个简单的流式解决方案可以完成这项工作,即

int[][] sol = IntStream.range(yStart, yStart+h)
    .parallel()
    .mapToObj(y -> getRealRGB(xStart, y, w, 1, null, 0, scansize))
    .toArray(int[][]::new);

这已经完成了在可用内核上分配工作负载的工作。它在幕后使用 Fork/Join 框架,就像您尝试做的那样,但这是一个实现细节。您可以将其与下一个流操作融合,例如

return IntStream.range(yStart, yStart+h)
    .parallel()
    .flatMap(y -> Arrays.stream(getRealRGB(xStart, y, w, 1, null, 0, scansize)))
    .toArray();

但是,如果我正确理解了方法签名,您实际上想要这样做

public int[] getRGB(
       int xStart, int yStart, int w, int h, int[] rgbArray, int offset, int scansize) {

    final int[] result = rgbArray!=null? rgbArray: new int[offset+h*scansize];
    IntStream.range(yStart, yStart+h).parallel()
        .forEach(y -> getRealRGB(xStart, y, w, 1, result, offset+y*scansize, scansize));
    return result;
}

履行合同。这也最大限度地减少了复制操作的次数。由于每个查询写入数组的不同区域,直接写入目标数组是线程安全的。

这保留了仅拆分行范围的策略。行的子拆分是可能的,但更复杂,而且很少得到回报。它只会在调用者请求很少的行但每行有很多值的极端情况下有所帮助。但即便如此,由于内存局部性问题,尚不清楚复杂的子行拆分是否会得到回报。


关于您最初的问题,如果您ForkJoinTask直接实现 a ,您可以使用getSurplusQueuedTaskCount()来决定是再次拆分还是直接计算。

阈值的选择是由于必须同步的任务对象数量和核心利用率而导致的开销之间的权衡。如果工作负载可以完美平衡地拆分,并且没有其他不相关的线程或进程使用 CPU 时间,那么每个内核只有一个项目将是完美的。但在实践中,这些任务永远不会在完全相同的时间运行,因此需要一些空闲的拆分任务由首先完成的内核执行。一个典型的阈值介于 1 或 3 之间(请记住,这是每个核心的队列任务数),对于您的任务类型,具有非常均匀的工作负载,可以使用较小的数字,例如,一旦有另一个队列项就停止拆分。

于 2016-12-01T11:47:09.977 回答