2

我已经阅读了很多关于这个算法的内容。在解释这个算法时,这些话是: 工作窃取算法 [关闭]

这些分叉的子任务可以递归地自己创建更多的子任务,从而填满并行工作线程的工作队列。如果一个线程完成并且无事可做,他可以从另一个线程的队列中“窃取”工作。

我知道这个算法是新的并且在 Executors.newCachedThreadPool / Executors.newFixedThreadPool 中不存在
我希望看到一个线程只处理它的队列。我创建了一个递归创建线程的小程序。见下文。我怎么能看到它不使用工作窃取算法?

public static void main(String[] args) throws Exception {
    int[] myArray = IntStream.rangeClosed(1, 10).toArray();

        ExecutorService executorService = Executors.newFixedThreadPool(5);
        CustomCallable customCallable = new CustomCallable(myArray, executorService);
        customCallable.call();
        executorService.shutdownNow();
}
public class CustomCallable implements Callable<Integer> {
    private static final int THRESHOLD = 2;
    private int[] array;
    private ExecutorService executorService;

    public CustomCallable(int[] array, ExecutorService executorService) {
        this.array = array;
        this.executorService = executorService;
    }

    @Override
    public Integer call() throws Exception {
        int sum = 0;
        log.debug(" start [{}] ", Arrays.toString(array) );
        if (array.length > THRESHOLD) {
            List<Callable<Integer>> dividedTasks = createSubtasks(array, executorService);
            sum = executorService.invokeAll(dividedTasks).stream()
                    .mapToInt(feature -> {
                        try {
                            return feature.get();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (ExecutionException e) {
                            e.printStackTrace();
                        }
                        return 0;
                    })
                    .sum();
       } else {
            sum = processing(array);
        }
        log.debug(" sum[{}]={} ", Arrays.toString(array) ,sum);
        return sum;
    }

    private List<Callable<Integer>> createSubtasks(int[] array, ExecutorService executorService) {
        int[] arr1 = Arrays.copyOfRange(array, 0, array.length / 2);
        int[] arr2 = Arrays.copyOfRange(array, array.length / 2, array.length);
        List<Callable<Integer>> dividedTasks = new ArrayList<>();
        dividedTasks.add(new CustomCallable(arr1, executorService));
        dividedTasks.add(new CustomCallable(arr2, executorService));
        return dividedTasks;
    }

    private Integer processing(int[] array) {
        int result = Arrays.stream(array)
                .sum();
        return result;
    }
}

这是输出:

[main] DEBUG com.example.CustomCallable -  start [[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]  
[pool-2-thread-1] DEBUG com.example.CustomCallable -  start [[1, 2, 3, 4, 5]]  
**[pool-2-thread-3] DEBUG com.example.CustomCallable -  start [[1, 2]]**  
[pool-2-thread-2] DEBUG com.example.CustomCallable -  start [[6, 7, 8, 9, 10]]  
[pool-2-thread-4] DEBUG com.example.CustomCallable -  start [[3, 4, 5]]  
[pool-2-thread-5] DEBUG com.example.CustomCallable -  start [[6, 7]]  
**[pool-2-thread-3] DEBUG com.example.CustomCallable -  sum[[1, 2]]=3**  
[pool-2-thread-5] DEBUG com.example.CustomCallable -  sum[[6, 7]]=13  
**[pool-2-thread-3] DEBUG com.example.CustomCallable -  start [[8, 9, 10]]**  
[pool-2-thread-5] DEBUG com.example.CustomCallable -  start [[3]]  
[pool-2-thread-5] DEBUG com.example.CustomCallable -  sum[[3]]=3  
[pool-2-thread-5] DEBUG com.example.CustomCallable -  start [[4, 5]]  
[pool-2-thread-5] DEBUG com.example.CustomCallable -  sum[[4, 5]]=9  
[pool-2-thread-5] DEBUG com.example.CustomCallable -  start [[8]]  
[pool-2-thread-5] DEBUG com.example.CustomCallable -  sum[[8]]=8  
[pool-2-thread-5] DEBUG com.example.CustomCallable -  start [[9, 10]]  
[pool-2-thread-5] DEBUG com.example.CustomCallable -  sum[[9, 10]]=19  
**[pool-2-thread-3] DEBUG com.example.CustomCallable -  sum[[8, 9, 10]]=27**  
[pool-2-thread-4] DEBUG com.example.CustomCallable -  sum[[3, 4, 5]]=12  
[pool-2-thread-2] DEBUG com.example.CustomCallable -  sum[[6, 7, 8, 9, 10]]=40  
[pool-2-thread-1] DEBUG com.example.CustomCallable -  sum[[1, 2, 3, 4, 5]]=15  
[main] DEBUG com.example.CustomCallable -  sum[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]=55 


如您所见:
主触发线程 2 和线程 1
线程 1 触发线程 3(计算 [1,2])和线程 4
线程 2 触发线程 5 和线程 3(计算 [1,2 ] 完成计算后 [8,9,10])
据我了解,线程 3 处理线程 1 和线程 2 的工作。
看起来像线程 3工作窃取
如果我将其更改为

 ExecutorService executorService = Executors.newWorkStealingPool();

它支持偷窃算法,我看到了什么不同?

[main] DEBUG com.example.CustomCallable -  start [[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]  
[ForkJoinPool-1-worker-1] DEBUG com.example.CustomCallable -  start [[1, 2, 3, 4, 5]]  
[ForkJoinPool-1-worker-2] DEBUG com.example.CustomCallable -  start [[1, 2]]  
[ForkJoinPool-1-worker-2] DEBUG com.example.CustomCallable -  sum[[1, 2]]=3  
[ForkJoinPool-1-worker-2] DEBUG com.example.CustomCallable -  start [[3, 4, 5]]  
[ForkJoinPool-1-worker-3] DEBUG com.example.CustomCallable -  start [[6, 7, 8, 9, 10]]  
[ForkJoinPool-1-worker-0] DEBUG com.example.CustomCallable -  start [[6, 7]]  
[ForkJoinPool-1-worker-0] DEBUG com.example.CustomCallable -  sum[[6, 7]]=13  
[ForkJoinPool-1-worker-0] DEBUG com.example.CustomCallable -  start [[8, 9, 10]]  
**[ForkJoinPool-1-worker-4] DEBUG com.example.CustomCallable -  start [[3]]**  
**[ForkJoinPool-1-worker-4] DEBUG com.example.CustomCallable -  sum[[3]]=3**  
[ForkJoinPool-1-worker-4] DEBUG com.example.CustomCallable -  start [[4, 5]]  
[ForkJoinPool-1-worker-4] DEBUG com.example.CustomCallable -  sum[[4, 5]]=9  
**[ForkJoinPool-1-worker-4] DEBUG com.example.CustomCallable -  start [[8]]**  
**[ForkJoinPool-1-worker-4] DEBUG com.example.CustomCallable -  sum[[8]]=8**  
[ForkJoinPool-1-worker-4] DEBUG com.example.CustomCallable -  start [[9, 10]]  
[ForkJoinPool-1-worker-4] DEBUG com.example.CustomCallable -  sum[[9, 10]]=19  
[ForkJoinPool-1-worker-2] DEBUG com.example.CustomCallable -  sum[[3, 4, 5]]=12  
[ForkJoinPool-1-worker-0] DEBUG com.example.CustomCallable -  sum[[8, 9, 10]]=27  
[ForkJoinPool-1-worker-1] DEBUG com.example.CustomCallable -  sum[[1, 2, 3, 4, 5]]=15  
[ForkJoinPool-1-worker-3] DEBUG com.example.CustomCallable -  sum[[6, 7, 8, 9, 10]]=40  
[main] DEBUG com.example.CustomCallable -  sum[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]=55


如您所见,我们有 5 个工人,工人 4 处理由工人 3 和工人 1 触发的工作。这与之前的执行有何不同?

你可以从github下载代码

4

0 回答 0