10

通常,当使用Java 8 的parallelStream() 时,结果是通过默认的通用fork-join 池(即ForkJoinPool.commonPool())执行。

然而,这显然是不可取的,如果一个人的工作远离 CPU 限制,例如,可能大部分时间都在等待 IO。在这种情况下,人们会希望使用一个单独的池,根据其他标准(例如任务可能实际使用 CPU 的时间)来确定大小。

没有明显的方法可以让 parallelStream() 使用不同的池,但是这里有一种详细的方法。

不幸的是,这种方法需要从 fork-join 池线程调用并行流上的终端操作。这样做的缺点是,如果目标分叉连接池完全忙于现有工作,则整个执行将等待它,而完全什么也不做。因此,池可能成为比单线程执行更严重的瓶颈。相比之下,当以“正常”方式使用 parallelStream() 时,ForkJoinPool.common.externalHelpComplete() 或 ForkJoinPool.common.tryExternalUnpush() 用于让来自池外的调用线程帮助处理。

有谁知道一种方法可以parallelStream() 使用非默认的fork-join 池,并让来自fork-join 池外部的调用线程帮助处理这项工作(但不是fork-的其余部分-加入游泳池的工作)?

4

1 回答 1

2

您可以awaitQuiescence在游泳池上使用以提供帮助。但是,您无法选择要帮助的任务,它只会从池中获取下一个待处理的任务,因此,如果有更多待处理的任务,您可能最终会在执行这些任务之前执行这些任务。

ForkJoinPool forkJoinPool = new ForkJoinPool(1);
// make all threads busy:
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE));
// submit our task (may contain your stream operation)
ForkJoinTask<Thread> task = forkJoinPool.submit(() -> Thread.currentThread());
// help out
while(!task.isDone()) // use zero timeout to execute one task only
    forkJoinPool.awaitQuiescence(0, TimeUnit.NANOSECONDS);
System.out.println(Thread.currentThread()==task.get());

将打印true

然而

ForkJoinPool forkJoinPool = new ForkJoinPool(1);
// make all threads busy:
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE));
// overload:
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE));
// submit our task (may contain your stream operation)
ForkJoinTask<Thread> task = forkJoinPool.submit(() -> Thread.currentThread());
// help out
while(!task.isDone())
    forkJoinPool.awaitQuiescence(0, TimeUnit.NANOSECONDS);
System.out.println(Thread.currentThread()==task.get());

当它尝试执行第二个阻塞任务时将永远挂起。

尽管如此,它将让启动线程帮助处理池的待处理任务,这将提高它自己的任务被执行的机会,只要没有无限的任务(上面的例子是极端的,只是为了演示而选择的)。


但请注意,Fork/Join 框架和StreamAPI 之间的整个关系无论如何都是一个实现细节。

于 2015-10-01T18:17:00.723 回答